pacemaker 2.1.5-a3f44794f94
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1/*
2 * Copyright 2004-2021 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
11#include <bzlib.h>
12#include <sys/socket.h>
13#include <netinet/in.h>
14#include <arpa/inet.h>
15#include <netdb.h>
16
17#include <crm/common/ipc.h>
19#include <crm/common/mainloop.h>
20#include <sys/utsname.h>
21
22#include <qb/qbipc_common.h>
23#include <qb/qbipcc.h>
24#include <qb/qbutil.h>
25
26#include <corosync/corodefs.h>
27#include <corosync/corotypes.h>
28#include <corosync/hdb.h>
29#include <corosync/cpg.h>
30
31#include <crm/msg_xml.h>
32
33#include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
34#include "crmcluster_private.h"
35
36/* @TODO Once we can update the public API to require crm_cluster_t* in more
37 * functions, we can ditch this in favor of cluster->cpg_handle.
38 */
39static cpg_handle_t pcmk_cpg_handle = 0;
40
41// @TODO These could be moved to crm_cluster_t* at that time as well
42static bool cpg_evicted = false;
43static GList *cs_message_queue = NULL;
44static int cs_message_timer = 0;
45
46struct pcmk__cpg_host_s {
47 uint32_t id;
48 uint32_t pid;
49 gboolean local;
51 uint32_t size;
52 char uname[MAX_NAME];
53} __attribute__ ((packed));
54
55typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
56
57struct pcmk__cpg_msg_s {
58 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
59 uint32_t id;
60 gboolean is_compressed;
61
64
65 uint32_t size;
66 uint32_t compressed_size;
67 /* 584 bytes */
68 char data[0];
69
70} __attribute__ ((packed));
71
72typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
73
74static void crm_cs_flush(gpointer data);
75
76#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
77
78#define cs_repeat(rc, counter, max, code) do { \
79 rc = code; \
80 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
81 counter++; \
82 crm_debug("Retrying operation after %ds", counter); \
83 sleep(counter); \
84 } else { \
85 break; \
86 } \
87 } while (counter < max)
88
94void
96{
97 pcmk_cpg_handle = 0;
98 if (cluster->cpg_handle) {
99 crm_trace("Disconnecting CPG");
100 cpg_leave(cluster->cpg_handle, &cluster->group);
101 cpg_finalize(cluster->cpg_handle);
102 cluster->cpg_handle = 0;
103
104 } else {
105 crm_info("No CPG connection");
106 }
107}
108
116uint32_t
117get_local_nodeid(cpg_handle_t handle)
118{
119 cs_error_t rc = CS_OK;
120 int retries = 0;
121 static uint32_t local_nodeid = 0;
122 cpg_handle_t local_handle = handle;
123 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
124 int fd = -1;
125 uid_t found_uid = 0;
126 gid_t found_gid = 0;
127 pid_t found_pid = 0;
128 int rv;
129
130 if(local_nodeid != 0) {
131 return local_nodeid;
132 }
133
134 if(handle == 0) {
135 crm_trace("Creating connection");
136 cs_repeat(rc, retries, 5, cpg_model_initialize(&local_handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
137 if (rc != CS_OK) {
138 crm_err("Could not connect to the CPG API: %s (%d)",
139 cs_strerror(rc), rc);
140 return 0;
141 }
142
143 rc = cpg_fd_get(local_handle, &fd);
144 if (rc != CS_OK) {
145 crm_err("Could not obtain the CPG API connection: %s (%d)",
146 cs_strerror(rc), rc);
147 goto bail;
148 }
149
150 /* CPG provider run as root (in given user namespace, anyway)? */
151 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
152 &found_uid, &found_gid))) {
153 crm_err("CPG provider is not authentic:"
154 " process %lld (uid: %lld, gid: %lld)",
155 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
156 (long long) found_uid, (long long) found_gid);
157 goto bail;
158 } else if (rv < 0) {
159 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
160 strerror(-rv), -rv);
161 goto bail;
162 }
163 }
164
165 if (rc == CS_OK) {
166 retries = 0;
167 crm_trace("Performing lookup");
168 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
169 }
170
171 if (rc != CS_OK) {
172 crm_err("Could not get local node id from the CPG API: %s (%d)",
173 pcmk__cs_err_str(rc), rc);
174 }
175
176bail:
177 if(handle == 0) {
178 crm_trace("Closing connection");
179 cpg_finalize(local_handle);
180 }
181 crm_debug("Local nodeid is %u", local_nodeid);
182 return local_nodeid;
183}
184
193static gboolean
194crm_cs_flush_cb(gpointer data)
195{
196 cs_message_timer = 0;
197 crm_cs_flush(data);
198 return FALSE;
199}
200
201// Send no more than this many CPG messages in one flush
202#define CS_SEND_MAX 200
203
210static void
211crm_cs_flush(gpointer data)
212{
213 unsigned int sent = 0;
214 guint queue_len = 0;
215 cs_error_t rc = 0;
216 cpg_handle_t *handle = (cpg_handle_t *) data;
217
218 if (*handle == 0) {
219 crm_trace("Connection is dead");
220 return;
221 }
222
223 queue_len = g_list_length(cs_message_queue);
224 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
225 crm_err("CPG queue has grown to %d", queue_len);
226
227 } else if (queue_len == CS_SEND_MAX) {
228 crm_warn("CPG queue has grown to %d", queue_len);
229 }
230
231 if (cs_message_timer != 0) {
232 /* There is already a timer, wait until it goes off */
233 crm_trace("Timer active %d", cs_message_timer);
234 return;
235 }
236
237 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
238 struct iovec *iov = cs_message_queue->data;
239
240 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
241 if (rc != CS_OK) {
242 break;
243 }
244
245 sent++;
246 crm_trace("CPG message sent, size=%llu",
247 (unsigned long long) iov->iov_len);
248
249 cs_message_queue = g_list_remove(cs_message_queue, iov);
250 free(iov->iov_base);
251 free(iov);
252 }
253
254 queue_len -= sent;
255 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
256 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
257 sent, pcmk__plural_s(sent), queue_len, pcmk__cs_err_str(rc),
258 (int) rc);
259
260 if (cs_message_queue) {
261 uint32_t delay_ms = 100;
262 if (rc != CS_OK) {
263 /* Proportionally more if sending failed but cap at 1s */
264 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
265 }
266 cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
267 }
268}
269
278static int
279pcmk_cpg_dispatch(gpointer user_data)
280{
281 cs_error_t rc = CS_OK;
282 crm_cluster_t *cluster = (crm_cluster_t *) user_data;
283
284 rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
285 if (rc != CS_OK) {
286 crm_err("Connection to the CPG API failed: %s (%d)",
287 pcmk__cs_err_str(rc), rc);
288 cpg_finalize(cluster->cpg_handle);
289 cluster->cpg_handle = 0;
290 return -1;
291
292 } else if (cpg_evicted) {
293 crm_err("Evicted from CPG membership");
294 return -1;
295 }
296 return 0;
297}
298
299static inline const char *
300ais_dest(const pcmk__cpg_host_t *host)
301{
302 if (host->local) {
303 return "local";
304 } else if (host->size > 0) {
305 return host->uname;
306 } else {
307 return "<all>";
308 }
309}
310
311static inline const char *
312msg_type2text(enum crm_ais_msg_types type)
313{
314 const char *text = "unknown";
315
316 switch (type) {
317 case crm_msg_none:
318 text = "unknown";
319 break;
320 case crm_msg_ais:
321 text = "ais";
322 break;
323 case crm_msg_cib:
324 text = "cib";
325 break;
326 case crm_msg_crmd:
327 text = "crmd";
328 break;
329 case crm_msg_pe:
330 text = "pengine";
331 break;
332 case crm_msg_te:
333 text = "tengine";
334 break;
335 case crm_msg_lrmd:
336 text = "lrmd";
337 break;
338 case crm_msg_attrd:
339 text = "attrd";
340 break;
341 case crm_msg_stonithd:
342 text = "stonithd";
343 break;
345 text = "stonith-ng";
346 break;
347 }
348 return text;
349}
350
359static bool
360check_message_sanity(const pcmk__cpg_msg_t *msg)
361{
362 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
363
364 if (payload_size < 1) {
365 crm_err("%sCPG message %d from %s invalid: "
366 "Claimed size of %d bytes is too small "
367 CRM_XS " from %s[%u] to %s@%s",
368 (msg->is_compressed? "Compressed " : ""),
369 msg->id, ais_dest(&(msg->sender)),
370 (int) msg->header.size,
371 msg_type2text(msg->sender.type), msg->sender.pid,
372 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
373 return false;
374 }
375
376 if (msg->header.error != CS_OK) {
377 crm_err("%sCPG message %d from %s invalid: "
378 "Sender indicated error %d "
379 CRM_XS " from %s[%u] to %s@%s",
380 (msg->is_compressed? "Compressed " : ""),
381 msg->id, ais_dest(&(msg->sender)),
382 msg->header.error,
383 msg_type2text(msg->sender.type), msg->sender.pid,
384 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
385 return false;
386 }
387
388 if (msg_data_len(msg) != payload_size) {
389 crm_err("%sCPG message %d from %s invalid: "
390 "Total size %d inconsistent with payload size %d "
391 CRM_XS " from %s[%u] to %s@%s",
392 (msg->is_compressed? "Compressed " : ""),
393 msg->id, ais_dest(&(msg->sender)),
394 (int) msg->header.size, (int) msg_data_len(msg),
395 msg_type2text(msg->sender.type), msg->sender.pid,
396 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
397 return false;
398 }
399
400 if (!msg->is_compressed &&
401 /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
402 * but checking the last byte or two should be quick
403 */
404 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
405 || (msg->data[msg->size - 1] != '\0'))) {
406 crm_err("CPG message %d from %s invalid: "
407 "Payload does not end at byte %llu "
408 CRM_XS " from %s[%u] to %s@%s",
409 msg->id, ais_dest(&(msg->sender)),
410 (unsigned long long) msg->size,
411 msg_type2text(msg->sender.type), msg->sender.pid,
412 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
413 return false;
414 }
415
416 crm_trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
417 (int) msg->header.size, (msg->is_compressed? "compressed " : ""),
418 msg->id, msg_type2text(msg->sender.type), msg->sender.pid,
419 ais_dest(&(msg->sender)),
420 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
421 return true;
422}
423
440char *
441pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
442 uint32_t *kind, const char **from)
443{
444 char *data = NULL;
445 pcmk__cpg_msg_t *msg = (pcmk__cpg_msg_t *) content;
446
447 if(handle) {
448 // Do filtering and field massaging
449 uint32_t local_nodeid = get_local_nodeid(handle);
450 const char *local_name = get_local_node_name();
451
452 if (msg->sender.id > 0 && msg->sender.id != nodeid) {
453 crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
454 return NULL;
455
456 } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
457 /* Not for us */
458 crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
459 return NULL;
460 } else if (msg->host.size != 0 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
461 /* Not for us */
462 crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
463 return NULL;
464 }
465
466 msg->sender.id = nodeid;
467 if (msg->sender.size == 0) {
468 crm_node_t *peer = crm_get_peer(nodeid, NULL);
469
470 if (peer == NULL) {
471 crm_err("Peer with nodeid=%u is unknown", nodeid);
472
473 } else if (peer->uname == NULL) {
474 crm_err("No uname for peer with nodeid=%u", nodeid);
475
476 } else {
477 crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
478 msg->sender.size = strlen(peer->uname);
479 memset(msg->sender.uname, 0, MAX_NAME);
480 memcpy(msg->sender.uname, peer->uname, msg->sender.size);
481 }
482 }
483 }
484
485 crm_trace("Got new%s message (size=%d, %d, %d)",
486 msg->is_compressed ? " compressed" : "",
487 msg_data_len(msg), msg->size, msg->compressed_size);
488
489 if (kind != NULL) {
490 *kind = msg->header.id;
491 }
492 if (from != NULL) {
493 *from = msg->sender.uname;
494 }
495
496 if (msg->is_compressed && msg->size > 0) {
497 int rc = BZ_OK;
498 char *uncompressed = NULL;
499 unsigned int new_size = msg->size + 1;
500
501 if (!check_message_sanity(msg)) {
502 goto badmsg;
503 }
504
505 crm_trace("Decompressing message data");
506 uncompressed = calloc(1, new_size);
507 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
508
509 if (rc != BZ_OK) {
510 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
511 bz2_strerror(rc), rc);
512 free(uncompressed);
513 goto badmsg;
514 }
515
516 CRM_ASSERT(rc == BZ_OK);
517 CRM_ASSERT(new_size == msg->size);
518
519 data = uncompressed;
520
521 } else if (!check_message_sanity(msg)) {
522 goto badmsg;
523
524 } else {
525 data = strdup(msg->data);
526 }
527
528 // Is this necessary?
529 crm_get_peer(msg->sender.id, msg->sender.uname);
530
531 crm_trace("Payload: %.200s", data);
532 return data;
533
534 badmsg:
535 crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
536 " min=%d, total=%d, size=%d, bz2_size=%d",
537 msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
538 ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
539 msg->sender.pid, (int)sizeof(pcmk__cpg_msg_t),
540 msg->header.size, msg->size, msg->compressed_size);
541
542 free(data);
543 return NULL;
544}
545
557static int
558cmp_member_list_nodeid(const void *first, const void *second)
559{
560 const struct cpg_address *const a = *((const struct cpg_address **) first),
561 *const b = *((const struct cpg_address **) second);
562 if (a->nodeid < b->nodeid) {
563 return -1;
564 } else if (a->nodeid > b->nodeid) {
565 return 1;
566 }
567 /* don't bother with "reason" nor "pid" */
568 return 0;
569}
570
579static const char *
580cpgreason2str(cpg_reason_t reason)
581{
582 switch (reason) {
583 case CPG_REASON_JOIN: return " via cpg_join";
584 case CPG_REASON_LEAVE: return " via cpg_leave";
585 case CPG_REASON_NODEDOWN: return " via cluster exit";
586 case CPG_REASON_NODEUP: return " via cluster join";
587 case CPG_REASON_PROCDOWN: return " for unknown reason";
588 default: break;
589 }
590 return "";
591}
592
601static inline const char *
602peer_name(crm_node_t *peer)
603{
604 if (peer == NULL) {
605 return "unknown node";
606 } else if (peer->uname == NULL) {
607 return "peer node";
608 } else {
609 return peer->uname;
610 }
611}
612
624static void
625node_left(const char *cpg_group_name, int event_counter,
626 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
627 const struct cpg_address **sorted_member_list,
628 size_t member_list_entries)
629{
630 crm_node_t *peer = pcmk__search_cluster_node_cache(cpg_peer->nodeid,
631 NULL);
632 const struct cpg_address **rival = NULL;
633
634 /* Most CPG-related Pacemaker code assumes that only one process on a node
635 * can be in the process group, but Corosync does not impose this
636 * limitation, and more than one can be a member in practice due to a
637 * daemon attempting to start while another instance is already running.
638 *
639 * Check for any such duplicate instances, because we don't want to process
640 * their leaving as if our actual peer left. If the peer that left still has
641 * an entry in sorted_member_list (with a different PID), we will ignore the
642 * leaving.
643 *
644 * @TODO Track CPG members' PIDs so we can tell exactly who left.
645 */
646 if (peer != NULL) {
647 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
648 sizeof(const struct cpg_address *),
649 cmp_member_list_nodeid);
650 }
651
652 if (rival == NULL) {
653 crm_info("Group %s event %d: %s (node %u pid %u) left%s",
654 cpg_group_name, event_counter, peer_name(peer),
655 cpg_peer->nodeid, cpg_peer->pid,
656 cpgreason2str(cpg_peer->reason));
657 if (peer != NULL) {
658 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
660 }
661 } else if (cpg_peer->nodeid == local_nodeid) {
662 crm_warn("Group %s event %d: duplicate local pid %u left%s",
663 cpg_group_name, event_counter,
664 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
665 } else {
666 crm_warn("Group %s event %d: "
667 "%s (node %u) duplicate pid %u left%s (%u remains)",
668 cpg_group_name, event_counter, peer_name(peer),
669 cpg_peer->nodeid, cpg_peer->pid,
670 cpgreason2str(cpg_peer->reason), (*rival)->pid);
671 }
672}
673
686void
687pcmk_cpg_membership(cpg_handle_t handle,
688 const struct cpg_name *groupName,
689 const struct cpg_address *member_list, size_t member_list_entries,
690 const struct cpg_address *left_list, size_t left_list_entries,
691 const struct cpg_address *joined_list, size_t joined_list_entries)
692{
693 int i;
694 gboolean found = FALSE;
695 static int counter = 0;
696 uint32_t local_nodeid = get_local_nodeid(handle);
697 const struct cpg_address **sorted;
698
699 sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
700 CRM_ASSERT(sorted != NULL);
701
702 for (size_t iter = 0; iter < member_list_entries; iter++) {
703 sorted[iter] = member_list + iter;
704 }
705 /* so that the cross-matching multiply-subscribed nodes is then cheap */
706 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
707 cmp_member_list_nodeid);
708
709 for (i = 0; i < left_list_entries; i++) {
710 node_left(groupName->value, counter, local_nodeid, &left_list[i],
711 sorted, member_list_entries);
712 }
713 free(sorted);
714 sorted = NULL;
715
716 for (i = 0; i < joined_list_entries; i++) {
717 crm_info("Group %s event %d: node %u pid %u joined%s",
718 groupName->value, counter, joined_list[i].nodeid,
719 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
720 }
721
722 for (i = 0; i < member_list_entries; i++) {
723 crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
724
725 if (member_list[i].nodeid == local_nodeid
726 && member_list[i].pid != getpid()) {
727 // See the note in node_left()
728 crm_warn("Group %s event %d: detected duplicate local pid %u",
729 groupName->value, counter, member_list[i].pid);
730 continue;
731 }
732 crm_info("Group %s event %d: %s (node %u pid %u) is member",
733 groupName->value, counter, peer_name(peer),
734 member_list[i].nodeid, member_list[i].pid);
735
736 /* If the caller left auto-reaping enabled, this will also update the
737 * state to member.
738 */
739 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
741
742 if (peer && peer->state && strcmp(peer->state, CRM_NODE_MEMBER)) {
743 /* The node is a CPG member, but we currently think it's not a
744 * cluster member. This is possible only if auto-reaping was
745 * disabled. The node may be joining, and we happened to get the CPG
746 * notification before the quorum notification; or the node may have
747 * just died, and we are processing its final messages; or a bug
748 * has affected the peer cache.
749 */
750 time_t now = time(NULL);
751
752 if (peer->when_lost == 0) {
753 // Track when we first got into this contradictory state
754 peer->when_lost = now;
755
756 } else if (now > (peer->when_lost + 60)) {
757 // If it persists for more than a minute, update the state
758 crm_warn("Node %u is member of group %s but was believed offline",
759 member_list[i].nodeid, groupName->value);
760 pcmk__update_peer_state(__func__, peer, CRM_NODE_MEMBER, 0);
761 }
762 }
763
764 if (local_nodeid == member_list[i].nodeid) {
765 found = TRUE;
766 }
767 }
768
769 if (!found) {
770 crm_err("Local node was evicted from group %s", groupName->value);
771 cpg_evicted = true;
772 }
773
774 counter++;
775}
776
784gboolean
786{
787 cs_error_t rc;
788 int fd = -1;
789 int retries = 0;
790 uint32_t id = 0;
791 crm_node_t *peer = NULL;
792 cpg_handle_t handle = 0;
793 const char *message_name = pcmk__message_name(crm_system_name);
794 uid_t found_uid = 0;
795 gid_t found_gid = 0;
796 pid_t found_pid = 0;
797 int rv;
798
799 struct mainloop_fd_callbacks cpg_fd_callbacks = {
800 .dispatch = pcmk_cpg_dispatch,
801 .destroy = cluster->destroy,
802 };
803
804 cpg_model_v1_data_t cpg_model_info = {
805 .model = CPG_MODEL_V1,
806 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
807 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
808 .cpg_totem_confchg_fn = NULL,
809 .flags = 0,
810 };
811
812 cpg_evicted = false;
813 cluster->group.length = 0;
814 cluster->group.value[0] = 0;
815
816 /* group.value is char[128] */
817 strncpy(cluster->group.value, message_name, 127);
818 cluster->group.value[127] = 0;
819 cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
820
821 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
822 if (rc != CS_OK) {
823 crm_err("Could not connect to the CPG API: %s (%d)",
824 cs_strerror(rc), rc);
825 goto bail;
826 }
827
828 rc = cpg_fd_get(handle, &fd);
829 if (rc != CS_OK) {
830 crm_err("Could not obtain the CPG API connection: %s (%d)",
831 cs_strerror(rc), rc);
832 goto bail;
833 }
834
835 /* CPG provider run as root (in given user namespace, anyway)? */
836 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
837 &found_uid, &found_gid))) {
838 crm_err("CPG provider is not authentic:"
839 " process %lld (uid: %lld, gid: %lld)",
840 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
841 (long long) found_uid, (long long) found_gid);
842 rc = CS_ERR_ACCESS;
843 goto bail;
844 } else if (rv < 0) {
845 crm_err("Could not verify authenticity of CPG provider: %s (%d)",
846 strerror(-rv), -rv);
847 rc = CS_ERR_ACCESS;
848 goto bail;
849 }
850
851 id = get_local_nodeid(handle);
852 if (id == 0) {
853 crm_err("Could not get local node id from the CPG API");
854 goto bail;
855
856 }
857 cluster->nodeid = id;
858
859 retries = 0;
860 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->group));
861 if (rc != CS_OK) {
862 crm_err("Could not join the CPG group '%s': %d", message_name, rc);
863 goto bail;
864 }
865
866 pcmk_cpg_handle = handle;
867 cluster->cpg_handle = handle;
868 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
869
870 bail:
871 if (rc != CS_OK) {
872 cpg_finalize(handle);
873 return FALSE;
874 }
875
876 peer = crm_get_peer(id, NULL);
878 return TRUE;
879}
880
891gboolean
892pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
893{
894 gboolean rc = TRUE;
895 char *data = NULL;
896
898 rc = send_cluster_text(crm_class_cluster, data, FALSE, node, dest);
899 free(data);
900 return rc;
901}
902
915gboolean
916send_cluster_text(enum crm_ais_msg_class msg_class, const char *data,
917 gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
918{
919 static int msg_id = 0;
920 static int local_pid = 0;
921 static int local_name_len = 0;
922 static const char *local_name = NULL;
923
924 char *target = NULL;
925 struct iovec *iov;
926 pcmk__cpg_msg_t *msg = NULL;
928
929 switch (msg_class) {
931 break;
932 default:
933 crm_err("Invalid message class: %d", msg_class);
934 return FALSE;
935 }
936
937 CRM_CHECK(dest != crm_msg_ais, return FALSE);
938
939 if (local_name == NULL) {
940 local_name = get_local_node_name();
941 }
942 if ((local_name_len == 0) && (local_name != NULL)) {
943 local_name_len = strlen(local_name);
944 }
945
946 if (data == NULL) {
947 data = "";
948 }
949
950 if (local_pid == 0) {
951 local_pid = getpid();
952 }
953
954 if (sender == crm_msg_none) {
955 sender = local_pid;
956 }
957
958 msg = calloc(1, sizeof(pcmk__cpg_msg_t));
959
960 msg_id++;
961 msg->id = msg_id;
962 msg->header.id = msg_class;
963 msg->header.error = CS_OK;
964
965 msg->host.type = dest;
966 msg->host.local = local;
967
968 if (node) {
969 if (node->uname) {
970 target = strdup(node->uname);
971 msg->host.size = strlen(node->uname);
972 memset(msg->host.uname, 0, MAX_NAME);
973 memcpy(msg->host.uname, node->uname, msg->host.size);
974 } else {
975 target = crm_strdup_printf("%u", node->id);
976 }
977 msg->host.id = node->id;
978 } else {
979 target = strdup("all");
980 }
981
982 msg->sender.id = 0;
983 msg->sender.type = sender;
984 msg->sender.pid = local_pid;
985 msg->sender.size = local_name_len;
986 memset(msg->sender.uname, 0, MAX_NAME);
987 if ((local_name != NULL) && (msg->sender.size != 0)) {
988 memcpy(msg->sender.uname, local_name, msg->sender.size);
989 }
990
991 msg->size = 1 + strlen(data);
992 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
993
994 if (msg->size < CRM_BZ2_THRESHOLD) {
995 msg = pcmk__realloc(msg, msg->header.size);
996 memcpy(msg->data, data, msg->size);
997
998 } else {
999 char *compressed = NULL;
1000 unsigned int new_size = 0;
1001 char *uncompressed = strdup(data);
1002
1003 if (pcmk__compress(uncompressed, (unsigned int) msg->size, 0,
1004 &compressed, &new_size) == pcmk_rc_ok) {
1005
1006 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
1007 msg = pcmk__realloc(msg, msg->header.size);
1008 memcpy(msg->data, compressed, new_size);
1009
1010 msg->is_compressed = TRUE;
1011 msg->compressed_size = new_size;
1012
1013 } else {
1014 // cppcheck seems not to understand the abort logic in pcmk__realloc
1015 // cppcheck-suppress memleak
1016 msg = pcmk__realloc(msg, msg->header.size);
1017 memcpy(msg->data, data, msg->size);
1018 }
1019
1020 free(uncompressed);
1021 free(compressed);
1022 }
1023
1024 iov = calloc(1, sizeof(struct iovec));
1025 iov->iov_base = msg;
1026 iov->iov_len = msg->header.size;
1027
1028 if (msg->compressed_size) {
1029 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
1030 msg->id, target, (unsigned long long) iov->iov_len,
1031 msg->compressed_size, data);
1032 } else {
1033 crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
1034 msg->id, target, (unsigned long long) iov->iov_len,
1035 msg->size, data);
1036 }
1037 free(target);
1038
1039 cs_message_queue = g_list_append(cs_message_queue, iov);
1040 crm_cs_flush(&pcmk_cpg_handle);
1041
1042 return TRUE;
1043}
1044
1053text2msg_type(const char *text)
1054{
1055 int type = crm_msg_none;
1056
1057 CRM_CHECK(text != NULL, return type);
1058 text = pcmk__message_name(text);
1059 if (pcmk__str_eq(text, "ais", pcmk__str_casei)) {
1060 type = crm_msg_ais;
1061 } else if (pcmk__str_eq(text, CRM_SYSTEM_CIB, pcmk__str_casei)) {
1062 type = crm_msg_cib;
1063 } else if (pcmk__strcase_any_of(text, CRM_SYSTEM_CRMD, CRM_SYSTEM_DC, NULL)) {
1065 } else if (pcmk__str_eq(text, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
1066 type = crm_msg_te;
1067 } else if (pcmk__str_eq(text, CRM_SYSTEM_PENGINE, pcmk__str_casei)) {
1068 type = crm_msg_pe;
1069 } else if (pcmk__str_eq(text, CRM_SYSTEM_LRMD, pcmk__str_casei)) {
1071 } else if (pcmk__str_eq(text, CRM_SYSTEM_STONITHD, pcmk__str_casei)) {
1073 } else if (pcmk__str_eq(text, "stonith-ng", pcmk__str_casei)) {
1075 } else if (pcmk__str_eq(text, "attrd", pcmk__str_casei)) {
1077
1078 } else {
1079 /* This will normally be a transient client rather than
1080 * a cluster daemon. Set the type to the pid of the client
1081 */
1082 int scan_rc = sscanf(text, "%d", &type);
1083
1084 if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
1085 /* Ensure it's sane */
1087 }
1088 }
1089 return type;
1090}
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:872
crm_node_t * pcmk__search_cluster_node_cache(unsigned int id, const char *uname)
Definition: membership.c:560
@ crm_proc_cpg
Definition: internal.h:21
crm_node_t * pcmk__update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node's state and membership information.
Definition: membership.c:1075
crm_ais_msg_types
Definition: cluster.h:103
@ crm_msg_stonithd
Definition: cluster.h:110
@ crm_msg_none
Definition: cluster.h:104
@ crm_msg_cib
Definition: cluster.h:107
@ crm_msg_pe
Definition: cluster.h:112
@ crm_msg_attrd
Definition: cluster.h:109
@ crm_msg_ais
Definition: cluster.h:105
@ crm_msg_te
Definition: cluster.h:111
@ crm_msg_stonith_ng
Definition: cluster.h:113
@ crm_msg_crmd
Definition: cluster.h:108
@ crm_msg_lrmd
Definition: cluster.h:106
const char * get_local_node_name(void)
Get the local node's name.
Definition: cluster.c:155
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Get a cluster node cache entry.
Definition: membership.c:700
#define CRM_NODE_MEMBER
Definition: cluster.h:33
crm_ais_msg_class
Definition: cluster.h:99
@ crm_class_cluster
Definition: cluster.h:100
#define ONLINESTATUS
Definition: util.h:39
#define OFFLINESTATUS
Definition: util.h:40
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
uint32_t compressed_size
Definition: cpg.c:8
pcmk__cpg_host_t host
Definition: cpg.c:4
gboolean pcmk__cpg_send_xml(xmlNode *msg, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:892
pcmk__cpg_host_t sender
Definition: cpg.c:5
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Handle a CPG configuration change event.
Definition: cpg.c:687
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Disconnect from Corosync CPG.
Definition: cpg.c:95
struct pcmk__cpg_msg_s pcmk__cpg_msg_t
Definition: cpg.c:72
#define CS_SEND_MAX
Definition: cpg.c:202
gboolean send_cluster_text(enum crm_ais_msg_class msg_class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:916
#define msg_data_len(msg)
Definition: cpg.c:76
enum crm_ais_msg_types type
Definition: cpg.c:3
char uname[MAX_NAME]
Definition: cpg.c:5
char data[0]
Definition: cpg.c:10
uint32_t get_local_nodeid(cpg_handle_t handle)
Get the local Corosync node ID (via CPG)
Definition: cpg.c:117
uint32_t size
Definition: cpg.c:4
uint32_t id
Definition: cpg.c:0
gboolean is_compressed
Definition: cpg.c:2
enum crm_ais_msg_types text2msg_type(const char *text)
Get the message type equivalent of a string.
Definition: cpg.c:1053
gboolean local
Definition: cpg.c:2
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Extract text data from a Corosync CPG message.
Definition: cpg.c:441
#define cs_repeat(rc, counter, max, code)
Definition: cpg.c:78
uint32_t pid
Definition: cpg.c:1
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Connect to Corosync CPG.
Definition: cpg.c:785
struct pcmk__cpg_host_s pcmk__cpg_host_t
Definition: cpg.c:55
struct pcmk__cpg_host_s __attribute__((packed))
#define CRM_SYSTEM_CIB
Definition: crm.h:104
#define CRM_SYSTEM_CRMD
Definition: crm.h:105
#define CRM_SYSTEM_DC
Definition: crm.h:102
#define CRM_SYSTEM_STONITHD
Definition: crm.h:109
#define CRM_SYSTEM_LRMD
Definition: crm.h:106
#define CRM_SYSTEM_TENGINE
Definition: crm.h:108
#define MAX_NAME
Maximum length of a Corosync cluster node name (in bytes)
Definition: crm.h:76
char * crm_system_name
Definition: utils.c:51
#define CRM_SYSTEM_PENGINE
Definition: crm.h:107
IPC interface to Pacemaker daemons.
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process (legacy)
Definition: ipc_client.c:1441
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:60
#define crm_info(fmt, args...)
Definition: logging.h:362
#define do_crm_log(level, fmt, args...)
Log a message.
Definition: logging.h:168
#define crm_warn(fmt, args...)
Definition: logging.h:360
#define CRM_XS
Definition: logging.h:55
#define crm_notice(fmt, args...)
Definition: logging.h:361
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:227
#define crm_debug(fmt, args...)
Definition: logging.h:364
#define crm_err(fmt, args...)
Definition: logging.h:359
#define crm_trace(fmt, args...)
Definition: logging.h:365
#define LOG_TRACE
Definition: logging.h:37
Wrappers for and extensions to glib mainloop.
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:182
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:956
const char * pcmk__message_name(const char *name)
Get name to be used as identifier for cluster messages.
Definition: messages.c:180
const char * target
Definition: pcmk_fence.c:29
char * strerror(int errnum)
const char * bz2_strerror(int rc)
Definition: results.c:823
#define CRM_ASSERT(expr)
Definition: results.h:42
@ pcmk_rc_ok
Definition: results.h:148
#define pcmk__plural_s(i)
int pcmk__compress(const char *data, unsigned int length, unsigned int max, char **result, unsigned int *result_len)
Definition: strings.c:746
bool pcmk__strcase_any_of(const char *s,...) G_GNUC_NULL_TERMINATED
Definition: strings.c:928
@ pcmk__str_casei
uint32_t nodeid
Definition: cluster.h:80
void(* destroy)(gpointer)
Definition: cluster.h:82
char * uname
Definition: cluster.h:53
uint32_t id
Definition: cluster.h:66
char * state
Definition: cluster.h:55
time_t when_lost
Definition: cluster.h:67
int(* dispatch)(gpointer userdata)
Dispatch function for mainloop file descriptor with data ready.
Definition: mainloop.h:138
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2121
#define CRM_BZ2_THRESHOLD
Definition: xml.h:48