16 #include <sys/types.h>
25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
27 static GHashTable *client_connections = NULL;
38 return client_connections? g_hash_table_size(client_connections) : 0;
53 if ((func != NULL) && (client_connections != NULL)) {
54 g_hash_table_foreach(client_connections, func, user_data);
61 if (client_connections) {
62 return g_hash_table_lookup(client_connections, c);
76 if (client_connections &&
id) {
77 g_hash_table_iter_init(&iter, client_connections);
78 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79 if (strcmp(client->id,
id) == 0) {
85 crm_trace(
"No client found with id=%s",
id);
94 }
else if (c->
name == NULL && c->
id == NULL) {
96 }
else if (c->
name == NULL) {
106 if (client_connections != NULL) {
107 int active = g_hash_table_size(client_connections);
110 crm_err(
"Exiting with %d active IPC client%s",
113 g_hash_table_destroy(client_connections); client_connections = NULL;
120 qb_ipcs_connection_t *c = NULL;
122 if (service == NULL) {
126 c = qb_ipcs_connection_first_get(service);
129 qb_ipcs_connection_t *last = c;
131 c = qb_ipcs_connection_next_get(service, last);
134 crm_notice(
"Disconnecting client %p, pid=%d...",
136 qb_ipcs_disconnect(last);
137 qb_ipcs_connection_unref(last);
152 client_from_connection(qb_ipcs_connection_t *c,
void *key, uid_t uid_client)
156 if (client == NULL) {
164 if (client->
user == NULL) {
165 client->
user = strdup(
"#unprivileged");
167 crm_err(
"Unable to enforce ACLs for user ID %d, assuming unprivileged",
180 if (client->
id == NULL) {
181 crm_err(
"Could not generate UUID for client");
189 if (client_connections == NULL) {
191 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
193 g_hash_table_insert(client_connections, key, client);
216 gid_t uid_cluster = 0;
217 gid_t gid_cluster = 0;
224 static bool need_log = TRUE;
227 crm_warn(
"Could not find user and group IDs for user %s",
233 if (uid_client != 0) {
234 crm_trace(
"Giving group %u access to new IPC connection", gid_cluster);
236 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
240 client = client_from_connection(c, NULL, uid_client);
241 if (client == NULL) {
245 if ((uid_client == 0) || (uid_client == uid_cluster)) {
250 crm_debug(
"New IPC client %s for PID %u with uid %d and gid %d",
251 client->
id, client->
pid, uid_client, gid_client);
255 static struct iovec *
256 pcmk__new_ipc_event(
void)
258 struct iovec *iov = calloc(2,
sizeof(
struct iovec));
273 free(event[0].iov_base);
274 free(event[1].iov_base);
280 free_event(gpointer
data)
301 if (client_connections) {
303 crm_trace(
"Destroying %p/%p (%d remaining)",
304 c, c->
ipcs, g_hash_table_size(client_connections) - 1);
305 g_hash_table_remove(client_connections, c->
ipcs);
308 crm_trace(
"Destroying remote connection %p (%d remaining)",
309 c, g_hash_table_size(client_connections) - 1);
310 g_hash_table_remove(client_connections, c->
id);
353 if ((errno == 0) && (qmax_int > 0)) {
354 client->
queue_max = (
unsigned int) qmax_int;
364 struct qb_ipcs_connection_stats stats;
366 stats.client_pid = 0;
367 qb_ipcs_connection_stats_get(c, &stats, 0);
368 return stats.client_pid;
387 char *uncompressed = NULL;
396 *
id = ((
struct qb_ipc_response_header *)
data)->id;
413 uncompressed = calloc(1, size_u);
415 crm_trace(
"Decompressing message data %u bytes into %u bytes",
418 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->
size_compressed, 1, 0);
441 crm_ipcs_flush_events_cb(gpointer
data)
446 crm_ipcs_flush_events(c);
461 guint
delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
479 unsigned int sent = 0;
480 unsigned int queue_len = 0;
496 struct iovec *
event = NULL;
506 qb_rc = qb_ipcs_event_sendv(c->
ipcs, event, 2);
514 header =
event[0].iov_base;
516 crm_trace(
"Event %d to %p[%d] (%lld compressed bytes) sent",
517 header->
qb.id, c->
ipcs, c->
pid, (
long long) qb_rc);
519 crm_trace(
"Event %d to %p[%d] (%lld bytes) sent: %.120s",
520 header->
qb.id, c->
ipcs, c->
pid, (
long long) qb_rc,
521 (
char *) (event[1].iov_base));
527 if (sent > 0 || queue_len) {
528 crm_trace(
"Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
529 sent, queue_len, c->
ipcs, c->
pid,
540 if ((c->
queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
542 crm_warn(
"Client with process ID %u has a backlog of %u messages "
545 crm_err(
"Evicting client with process ID %u due to backlog of %u messages "
548 qb_ipcs_disconnect(c->
ipcs);
554 delay_next_flush(c, queue_len);
578 uint32_t max_send_size,
struct iovec **result,
581 static unsigned int biggest = 0;
583 unsigned int total = 0;
584 char *compressed = NULL;
588 if ((message == NULL) || (result == NULL)) {
593 if (header == NULL) {
599 if (max_send_size == 0) {
605 iov = pcmk__new_ipc_event();
607 iov[0].iov_base = header;
613 if (total < max_send_size) {
614 iov[1].iov_base = buffer;
618 unsigned int new_size = 0;
621 (
unsigned int) max_send_size, &compressed,
628 iov[1].iov_base = compressed;
638 crm_err(
"Could not compress %u-byte message into less than IPC "
639 "limit of %u bytes; set PCMK_ipc_buffer to higher value "
640 "(%u bytes suggested)",
650 header->
qb.size = iov[0].iov_len + iov[1].iov_len;
651 header->
qb.id = (int32_t)request;
656 *bytes = header->
qb.size;
665 static uint32_t
id = 1;
682 header->
qb.id =
id++;
689 struct iovec *iov_copy = pcmk__new_ipc_event();
692 iov_copy[0].iov_len = iov[0].iov_len;
693 iov_copy[0].iov_base = malloc(iov[0].iov_len);
694 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
696 iov_copy[1].iov_len = iov[1].iov_len;
697 iov_copy[1].iov_base = malloc(iov[1].iov_len);
698 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
700 add_event(c, iov_copy);
708 qb_rc = qb_ipcs_response_sendv(c->
ipcs, iov, 2);
709 if (qb_rc < header->qb.size) {
713 crm_notice(
"Response %d to pid %d failed: %s "
714 CRM_XS " bytes=%u rc=%lld ipcs=%p",
716 header->
qb.size, (
long long) qb_rc, c->
ipcs);
719 crm_trace(
"Response %d sent, %lld bytes to %p[%d]",
720 header->
qb.id, (
long long) qb_rc, c->
ipcs, c->
pid);
729 rc = crm_ipcs_flush_events(c);
731 crm_ipcs_flush_events(c);
734 if ((
rc == EPIPE) || (
rc == ENOTCONN)) {
744 struct iovec *iov = NULL;
780 uint32_t request, uint32_t
flags,
const char *tag,
788 crm_trace(
"Ack'ing IPC message from %s as <%s status=%d>",
813 qb_ipcs_service_t **ipcs_rw,
814 qb_ipcs_service_t **ipcs_shm,
815 struct qb_ipcs_service_handlers *ro_cb,
816 struct qb_ipcs_service_handlers *rw_cb)
819 QB_IPC_NATIVE, ro_cb);
822 QB_IPC_NATIVE, rw_cb);
827 if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
828 crm_err(
"Failed to create the CIB manager: exiting and inhibiting respawn");
829 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled");
847 qb_ipcs_service_t *ipcs_rw,
848 qb_ipcs_service_t *ipcs_shm)
850 qb_ipcs_destroy(ipcs_ro);
851 qb_ipcs_destroy(ipcs_rw);
852 qb_ipcs_destroy(ipcs_shm);
879 struct qb_ipcs_service_handlers *cb)
884 crm_err(
"Failed to create pacemaker-attrd server: exiting and inhibiting respawn");
885 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
900 struct qb_ipcs_service_handlers *cb)
906 crm_err(
"Failed to create fencer: exiting and inhibiting respawn.");
907 crm_warn(
"Verify pacemaker and pacemaker_remote are not both enabled.");
928 || !strcmp(
name,
"stonith-ng")
929 || !strcmp(
name,
"attrd")