pacemaker 2.1.5-a3f44794f94
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1/*
2 * Copyright 2008-2022 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10#include <crm_internal.h>
11#include <crm/crm.h>
12
13#include <sys/param.h>
14#include <stdio.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <unistd.h>
18#include <sys/socket.h>
19#include <arpa/inet.h>
20#include <netinet/in.h>
21#include <netinet/ip.h>
22#include <netinet/tcp.h>
23#include <netdb.h>
24#include <stdlib.h>
25#include <errno.h>
26#include <inttypes.h> // PRIx32
27
28#include <glib.h>
29#include <bzlib.h>
30
32#include <crm/common/xml.h>
33#include <crm/common/mainloop.h>
35
36#ifdef HAVE_GNUTLS_GNUTLS_H
37# include <gnutls/gnutls.h>
38#endif
39
40/* Swab macros from linux/swab.h */
41#ifdef HAVE_LINUX_SWAB_H
42# include <linux/swab.h>
43#else
44/*
45 * casts are necessary for constants, because we never know how for sure
46 * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
47 */
48#define __swab16(x) ((uint16_t)( \
49 (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
50 (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
51
52#define __swab32(x) ((uint32_t)( \
53 (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
54 (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
55 (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
56 (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
57
58#define __swab64(x) ((uint64_t)( \
59 (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
60 (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
61 (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
62 (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
63 (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
64 (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
65 (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
66 (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
67#endif
68
69#define REMOTE_MSG_VERSION 1
70#define ENDIAN_LOCAL 0xBADADBBD
71
72struct remote_header_v0 {
73 uint32_t endian; /* Detect messages from hosts with different endian-ness */
74 uint32_t version;
75 uint64_t id;
76 uint64_t flags;
77 uint32_t size_total;
78 uint32_t payload_offset;
79 uint32_t payload_compressed;
80 uint32_t payload_uncompressed;
81
82 /* New fields get added here */
83
84} __attribute__ ((packed));
85
97static struct remote_header_v0 *
98localized_remote_header(pcmk__remote_t *remote)
99{
100 struct remote_header_v0 *header = (struct remote_header_v0 *)remote->buffer;
101 if(remote->buffer_offset < sizeof(struct remote_header_v0)) {
102 return NULL;
103
104 } else if(header->endian != ENDIAN_LOCAL) {
105 uint32_t endian = __swab32(header->endian);
106
108 if(endian != ENDIAN_LOCAL) {
109 crm_err("Invalid message detected, endian mismatch: %" PRIx32
110 " is neither %" PRIx32 " nor the swab'd %" PRIx32,
111 ENDIAN_LOCAL, header->endian, endian);
112 return NULL;
113 }
114
115 header->id = __swab64(header->id);
116 header->flags = __swab64(header->flags);
117 header->endian = __swab32(header->endian);
118
119 header->version = __swab32(header->version);
120 header->size_total = __swab32(header->size_total);
121 header->payload_offset = __swab32(header->payload_offset);
122 header->payload_compressed = __swab32(header->payload_compressed);
123 header->payload_uncompressed = __swab32(header->payload_uncompressed);
124 }
125
126 return header;
127}
128
129#ifdef HAVE_GNUTLS_GNUTLS_H
130
131int
132pcmk__tls_client_handshake(pcmk__remote_t *remote, int timeout_ms)
133{
134 int rc = 0;
135 int pollrc = 0;
136 time_t time_limit = time(NULL) + timeout_ms / 1000;
137
138 do {
139 rc = gnutls_handshake(*remote->tls_session);
140 if ((rc == GNUTLS_E_INTERRUPTED) || (rc == GNUTLS_E_AGAIN)) {
141 pollrc = pcmk__remote_ready(remote, 1000);
142 if ((pollrc != pcmk_rc_ok) && (pollrc != ETIME)) {
143 /* poll returned error, there is no hope */
144 crm_trace("TLS handshake poll failed: %s (%d)",
145 pcmk_strerror(pollrc), pollrc);
146 return pcmk_legacy2rc(pollrc);
147 }
148 } else if (rc < 0) {
149 crm_trace("TLS handshake failed: %s (%d)",
150 gnutls_strerror(rc), rc);
151 return EPROTO;
152 } else {
153 return pcmk_rc_ok;
154 }
155 } while (time(NULL) < time_limit);
156 return ETIME;
157}
158
165static void
166set_minimum_dh_bits(const gnutls_session_t *session)
167{
168 int dh_min_bits;
169
170 pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
171
172 /* This function is deprecated since GnuTLS 3.1.7, in favor of letting
173 * the priority string imply the DH requirements, but this is the only
174 * way to give the user control over compatibility with older servers.
175 */
176 if (dh_min_bits > 0) {
177 crm_info("Requiring server use a Diffie-Hellman prime of at least %d bits",
178 dh_min_bits);
179 gnutls_dh_set_prime_bits(*session, dh_min_bits);
180 }
181}
182
183static unsigned int
184get_bound_dh_bits(unsigned int dh_bits)
185{
186 int dh_min_bits;
187 int dh_max_bits;
188
189 pcmk__scan_min_int(getenv("PCMK_dh_min_bits"), &dh_min_bits, 0);
190 pcmk__scan_min_int(getenv("PCMK_dh_max_bits"), &dh_max_bits, 0);
191 if ((dh_max_bits > 0) && (dh_max_bits < dh_min_bits)) {
192 crm_warn("Ignoring PCMK_dh_max_bits less than PCMK_dh_min_bits");
193 dh_max_bits = 0;
194 }
195 if ((dh_min_bits > 0) && (dh_bits < dh_min_bits)) {
196 return dh_min_bits;
197 }
198 if ((dh_max_bits > 0) && (dh_bits > dh_max_bits)) {
199 return dh_max_bits;
200 }
201 return dh_bits;
202}
203
216pcmk__new_tls_session(int csock, unsigned int conn_type,
217 gnutls_credentials_type_t cred_type, void *credentials)
218{
219 int rc = GNUTLS_E_SUCCESS;
220 const char *prio_base = NULL;
221 char *prio = NULL;
222 gnutls_session_t *session = NULL;
223
224 /* Determine list of acceptable ciphers, etc. Pacemaker always adds the
225 * values required for its functionality.
226 *
227 * For an example of anonymous authentication, see:
228 * http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication
229 */
230
231 prio_base = getenv("PCMK_tls_priorities");
232 if (prio_base == NULL) {
233 prio_base = PCMK_GNUTLS_PRIORITIES;
234 }
235 prio = crm_strdup_printf("%s:%s", prio_base,
236 (cred_type == GNUTLS_CRD_ANON)? "+ANON-DH" : "+DHE-PSK:+PSK");
237
238 session = gnutls_malloc(sizeof(gnutls_session_t));
239 if (session == NULL) {
240 rc = GNUTLS_E_MEMORY_ERROR;
241 goto error;
242 }
243
244 rc = gnutls_init(session, conn_type);
245 if (rc != GNUTLS_E_SUCCESS) {
246 goto error;
247 }
248
249 /* @TODO On the server side, it would be more efficient to cache the
250 * priority with gnutls_priority_init2() and set it with
251 * gnutls_priority_set() for all sessions.
252 */
253 rc = gnutls_priority_set_direct(*session, prio, NULL);
254 if (rc != GNUTLS_E_SUCCESS) {
255 goto error;
256 }
257 if (conn_type == GNUTLS_CLIENT) {
258 set_minimum_dh_bits(session);
259 }
260
261 gnutls_transport_set_ptr(*session,
262 (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
263
264 rc = gnutls_credentials_set(*session, cred_type, credentials);
265 if (rc != GNUTLS_E_SUCCESS) {
266 goto error;
267 }
268 free(prio);
269 return session;
270
271error:
272 crm_err("Could not initialize %s TLS %s session: %s "
273 CRM_XS " rc=%d priority='%s'",
274 (cred_type == GNUTLS_CRD_ANON)? "anonymous" : "PSK",
275 (conn_type == GNUTLS_SERVER)? "server" : "client",
276 gnutls_strerror(rc), rc, prio);
277 free(prio);
278 if (session != NULL) {
279 gnutls_free(session);
280 }
281 return NULL;
282}
283
299int
300pcmk__init_tls_dh(gnutls_dh_params_t *dh_params)
301{
302 int rc = GNUTLS_E_SUCCESS;
303 unsigned int dh_bits = 0;
304
305 rc = gnutls_dh_params_init(dh_params);
306 if (rc != GNUTLS_E_SUCCESS) {
307 goto error;
308 }
309
310 dh_bits = gnutls_sec_param_to_pk_bits(GNUTLS_PK_DH,
311 GNUTLS_SEC_PARAM_NORMAL);
312 if (dh_bits == 0) {
313 rc = GNUTLS_E_DH_PRIME_UNACCEPTABLE;
314 goto error;
315 }
316 dh_bits = get_bound_dh_bits(dh_bits);
317
318 crm_info("Generating Diffie-Hellman parameters with %u-bit prime for TLS",
319 dh_bits);
320 rc = gnutls_dh_params_generate2(*dh_params, dh_bits);
321 if (rc != GNUTLS_E_SUCCESS) {
322 goto error;
323 }
324
325 return pcmk_rc_ok;
326
327error:
328 crm_err("Could not initialize Diffie-Hellman parameters for TLS: %s "
329 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
330 return EPROTO;
331}
332
344int
345pcmk__read_handshake_data(const pcmk__client_t *client)
346{
347 int rc = 0;
348
349 CRM_ASSERT(client && client->remote && client->remote->tls_session);
350
351 do {
352 rc = gnutls_handshake(*client->remote->tls_session);
353 } while (rc == GNUTLS_E_INTERRUPTED);
354
355 if (rc == GNUTLS_E_AGAIN) {
356 /* No more data is available at the moment. This function should be
357 * invoked again once the client sends more.
358 */
359 return EAGAIN;
360 } else if (rc != GNUTLS_E_SUCCESS) {
361 crm_err("TLS handshake with remote client failed: %s "
362 CRM_XS " rc=%d", gnutls_strerror(rc), rc);
363 return EPROTO;
364 }
365 return pcmk_rc_ok;
366}
367
368// \return Standard Pacemaker return code
369static int
370send_tls(gnutls_session_t *session, struct iovec *iov)
371{
372 const char *unsent = iov->iov_base;
373 size_t unsent_len = iov->iov_len;
374 ssize_t gnutls_rc;
375
376 if (unsent == NULL) {
377 return EINVAL;
378 }
379
380 crm_trace("Sending TLS message of %llu bytes",
381 (unsigned long long) unsent_len);
382 while (true) {
383 gnutls_rc = gnutls_record_send(*session, unsent, unsent_len);
384
385 if (gnutls_rc == GNUTLS_E_INTERRUPTED || gnutls_rc == GNUTLS_E_AGAIN) {
386 crm_trace("Retrying to send %llu bytes remaining",
387 (unsigned long long) unsent_len);
388
389 } else if (gnutls_rc < 0) {
390 // Caller can log as error if necessary
391 crm_info("TLS connection terminated: %s " CRM_XS " rc=%lld",
392 gnutls_strerror((int) gnutls_rc),
393 (long long) gnutls_rc);
394 return ECONNABORTED;
395
396 } else if (gnutls_rc < unsent_len) {
397 crm_trace("Sent %lld of %llu bytes remaining",
398 (long long) gnutls_rc, (unsigned long long) unsent_len);
399 unsent_len -= gnutls_rc;
400 unsent += gnutls_rc;
401 } else {
402 crm_trace("Sent all %lld bytes remaining", (long long) gnutls_rc);
403 break;
404 }
405 }
406 return pcmk_rc_ok;
407}
408#endif
409
410// \return Standard Pacemaker return code
411static int
412send_plaintext(int sock, struct iovec *iov)
413{
414 const char *unsent = iov->iov_base;
415 size_t unsent_len = iov->iov_len;
416 ssize_t write_rc;
417
418 if (unsent == NULL) {
419 return EINVAL;
420 }
421
422 crm_debug("Sending plaintext message of %llu bytes to socket %d",
423 (unsigned long long) unsent_len, sock);
424 while (true) {
425 write_rc = write(sock, unsent, unsent_len);
426 if (write_rc < 0) {
427 int rc = errno;
428
429 if ((errno == EINTR) || (errno == EAGAIN)) {
430 crm_trace("Retrying to send %llu bytes remaining to socket %d",
431 (unsigned long long) unsent_len, sock);
432 continue;
433 }
434
435 // Caller can log as error if necessary
436 crm_info("Could not send message: %s " CRM_XS " rc=%d socket=%d",
437 pcmk_rc_str(rc), rc, sock);
438 return rc;
439
440 } else if (write_rc < unsent_len) {
441 crm_trace("Sent %lld of %llu bytes remaining",
442 (long long) write_rc, (unsigned long long) unsent_len);
443 unsent += write_rc;
444 unsent_len -= write_rc;
445 continue;
446
447 } else {
448 crm_trace("Sent all %lld bytes remaining: %.100s",
449 (long long) write_rc, (char *) (iov->iov_base));
450 break;
451 }
452 }
453 return pcmk_rc_ok;
454}
455
456// \return Standard Pacemaker return code
457static int
458remote_send_iovs(pcmk__remote_t *remote, struct iovec *iov, int iovs)
459{
460 int rc = pcmk_rc_ok;
461
462 for (int lpc = 0; (lpc < iovs) && (rc == pcmk_rc_ok); lpc++) {
463#ifdef HAVE_GNUTLS_GNUTLS_H
464 if (remote->tls_session) {
465 rc = send_tls(remote->tls_session, &(iov[lpc]));
466 continue;
467 }
468#endif
469 if (remote->tcp_socket) {
470 rc = send_plaintext(remote->tcp_socket, &(iov[lpc]));
471 } else {
472 rc = ESOCKTNOSUPPORT;
473 }
474 }
475 return rc;
476}
477
487int
489{
490 int rc = pcmk_rc_ok;
491 static uint64_t id = 0;
492 char *xml_text = NULL;
493
494 struct iovec iov[2];
495 struct remote_header_v0 *header;
496
497 CRM_CHECK((remote != NULL) && (msg != NULL), return EINVAL);
498
499 xml_text = dump_xml_unformatted(msg);
500 CRM_CHECK(xml_text != NULL, return EINVAL);
501
502 header = calloc(1, sizeof(struct remote_header_v0));
503 CRM_ASSERT(header != NULL);
504
505 iov[0].iov_base = header;
506 iov[0].iov_len = sizeof(struct remote_header_v0);
507
508 iov[1].iov_base = xml_text;
509 iov[1].iov_len = 1 + strlen(xml_text);
510
511 id++;
512 header->id = id;
513 header->endian = ENDIAN_LOCAL;
514 header->version = REMOTE_MSG_VERSION;
515 header->payload_offset = iov[0].iov_len;
516 header->payload_uncompressed = iov[1].iov_len;
517 header->size_total = iov[0].iov_len + iov[1].iov_len;
518
519 rc = remote_send_iovs(remote, iov, 2);
520 if (rc != pcmk_rc_ok) {
521 crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
522 pcmk_rc_str(rc), rc);
523 }
524
525 free(iov[0].iov_base);
526 free(iov[1].iov_base);
527 return rc;
528}
529
539xmlNode *
541{
542 xmlNode *xml = NULL;
543 struct remote_header_v0 *header = localized_remote_header(remote);
544
545 if (header == NULL) {
546 return NULL;
547 }
548
549 /* Support compression on the receiving end now, in case we ever want to add it later */
550 if (header->payload_compressed) {
551 int rc = 0;
552 unsigned int size_u = 1 + header->payload_uncompressed;
553 char *uncompressed = calloc(1, header->payload_offset + size_u);
554
555 crm_trace("Decompressing message data %d bytes into %d bytes",
556 header->payload_compressed, size_u);
557
558 rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
559 remote->buffer + header->payload_offset,
560 header->payload_compressed, 1, 0);
561
562 if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
563 crm_warn("Couldn't decompress v%d message, we only understand v%d",
564 header->version, REMOTE_MSG_VERSION);
565 free(uncompressed);
566 return NULL;
567
568 } else if (rc != BZ_OK) {
569 crm_err("Decompression failed: %s " CRM_XS " bzerror=%d",
570 bz2_strerror(rc), rc);
571 free(uncompressed);
572 return NULL;
573 }
574
575 CRM_ASSERT(size_u == header->payload_uncompressed);
576
577 memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
578 remote->buffer_size = header->payload_offset + size_u;
579
580 free(remote->buffer);
581 remote->buffer = uncompressed;
582 header = localized_remote_header(remote);
583 }
584
585 /* take ownership of the buffer */
586 remote->buffer_offset = 0;
587
588 CRM_LOG_ASSERT(remote->buffer[sizeof(struct remote_header_v0) + header->payload_uncompressed - 1] == 0);
589
590 xml = string2xml(remote->buffer + header->payload_offset);
591 if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
592 crm_warn("Couldn't parse v%d message, we only understand v%d",
593 header->version, REMOTE_MSG_VERSION);
594
595 } else if (xml == NULL) {
596 crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
597 }
598
599 return xml;
600}
601
602static int
603get_remote_socket(const pcmk__remote_t *remote)
604{
605#ifdef HAVE_GNUTLS_GNUTLS_H
606 if (remote->tls_session) {
607 void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
608
609 return GPOINTER_TO_INT(sock_ptr);
610 }
611#endif
612
613 if (remote->tcp_socket) {
614 return remote->tcp_socket;
615 }
616
617 crm_err("Remote connection type undetermined (bug?)");
618 return -1;
619}
620
632int
633pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
634{
635 struct pollfd fds = { 0, };
636 int sock = 0;
637 int rc = 0;
638 time_t start;
639 int timeout = timeout_ms;
640
641 sock = get_remote_socket(remote);
642 if (sock <= 0) {
643 crm_trace("No longer connected");
644 return ENOTCONN;
645 }
646
647 start = time(NULL);
648 errno = 0;
649 do {
650 fds.fd = sock;
651 fds.events = POLLIN;
652
653 /* If we got an EINTR while polling, and we have a
654 * specific timeout we are trying to honor, attempt
655 * to adjust the timeout to the closest second. */
656 if (errno == EINTR && (timeout > 0)) {
657 timeout = timeout_ms - ((time(NULL) - start) * 1000);
658 if (timeout < 1000) {
659 timeout = 1000;
660 }
661 }
662
663 rc = poll(&fds, 1, timeout);
664 } while (rc < 0 && errno == EINTR);
665
666 if (rc < 0) {
667 return errno;
668 }
669 return (rc == 0)? ETIME : pcmk_rc_ok;
670}
671
684static int
685read_available_remote_data(pcmk__remote_t *remote)
686{
687 int rc = pcmk_rc_ok;
688 size_t read_len = sizeof(struct remote_header_v0);
689 struct remote_header_v0 *header = localized_remote_header(remote);
690 bool received = false;
691 ssize_t read_rc;
692
693 if(header) {
694 /* Stop at the end of the current message */
695 read_len = header->size_total;
696 }
697
698 /* automatically grow the buffer when needed */
699 if(remote->buffer_size < read_len) {
700 remote->buffer_size = 2 * read_len;
701 crm_trace("Expanding buffer to %llu bytes",
702 (unsigned long long) remote->buffer_size);
703 remote->buffer = pcmk__realloc(remote->buffer, remote->buffer_size + 1);
704 }
705
706#ifdef HAVE_GNUTLS_GNUTLS_H
707 if (!received && remote->tls_session) {
708 read_rc = gnutls_record_recv(*(remote->tls_session),
709 remote->buffer + remote->buffer_offset,
710 remote->buffer_size - remote->buffer_offset);
711 if (read_rc == GNUTLS_E_INTERRUPTED) {
712 rc = EINTR;
713 } else if (read_rc == GNUTLS_E_AGAIN) {
714 rc = EAGAIN;
715 } else if (read_rc < 0) {
716 crm_debug("TLS receive failed: %s (%lld)",
717 gnutls_strerror(read_rc), (long long) read_rc);
718 rc = EIO;
719 }
720 received = true;
721 }
722#endif
723
724 if (!received && remote->tcp_socket) {
725 read_rc = read(remote->tcp_socket,
726 remote->buffer + remote->buffer_offset,
727 remote->buffer_size - remote->buffer_offset);
728 if (read_rc < 0) {
729 rc = errno;
730 }
731 received = true;
732 }
733
734 if (!received) {
735 crm_err("Remote connection type undetermined (bug?)");
736 return ESOCKTNOSUPPORT;
737 }
738
739 /* process any errors. */
740 if (read_rc > 0) {
741 remote->buffer_offset += read_rc;
742 /* always null terminate buffer, the +1 to alloc always allows for this. */
743 remote->buffer[remote->buffer_offset] = '\0';
744 crm_trace("Received %lld more bytes (%llu total)",
745 (long long) read_rc,
746 (unsigned long long) remote->buffer_offset);
747
748 } else if ((rc == EINTR) || (rc == EAGAIN)) {
749 crm_trace("No data available for non-blocking remote read: %s (%d)",
750 pcmk_rc_str(rc), rc);
751
752 } else if (read_rc == 0) {
753 crm_debug("End of remote data encountered after %llu bytes",
754 (unsigned long long) remote->buffer_offset);
755 return ENOTCONN;
756
757 } else {
758 crm_debug("Error receiving remote data after %llu bytes: %s (%d)",
759 (unsigned long long) remote->buffer_offset,
760 pcmk_rc_str(rc), rc);
761 return ENOTCONN;
762 }
763
764 header = localized_remote_header(remote);
765 if(header) {
766 if(remote->buffer_offset < header->size_total) {
767 crm_trace("Read partial remote message (%llu of %u bytes)",
768 (unsigned long long) remote->buffer_offset,
769 header->size_total);
770 } else {
771 crm_trace("Read full remote message of %llu bytes",
772 (unsigned long long) remote->buffer_offset);
773 return pcmk_rc_ok;
774 }
775 }
776
777 return EAGAIN;
778}
779
790int
792{
793 int rc = pcmk_rc_ok;
794 time_t start = time(NULL);
795 int remaining_timeout = 0;
796
797 if (timeout_ms == 0) {
798 timeout_ms = 10000;
799 } else if (timeout_ms < 0) {
800 timeout_ms = 60000;
801 }
802
803 remaining_timeout = timeout_ms;
804 while (remaining_timeout > 0) {
805
806 crm_trace("Waiting for remote data (%d ms of %d ms timeout remaining)",
807 remaining_timeout, timeout_ms);
808 rc = pcmk__remote_ready(remote, remaining_timeout);
809
810 if (rc == ETIME) {
811 crm_err("Timed out (%d ms) while waiting for remote data",
812 remaining_timeout);
813 return rc;
814
815 } else if (rc != pcmk_rc_ok) {
816 crm_debug("Wait for remote data aborted (will retry): %s "
817 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
818
819 } else {
820 rc = read_available_remote_data(remote);
821 if (rc == pcmk_rc_ok) {
822 return rc;
823 } else if (rc == EAGAIN) {
824 crm_trace("Waiting for more remote data");
825 } else {
826 crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
827 pcmk_rc_str(rc), rc);
828 }
829 }
830
831 // Don't waste time retrying after fatal errors
832 if ((rc == ENOTCONN) || (rc == ESOCKTNOSUPPORT)) {
833 return rc;
834 }
835
836 remaining_timeout = timeout_ms - ((time(NULL) - start) * 1000);
837 }
838 return ETIME;
839}
840
841struct tcp_async_cb_data {
842 int sock;
843 int timeout_ms;
844 time_t start;
845 void *userdata;
846 void (*callback) (void *userdata, int rc, int sock);
848
849// \return TRUE if timer should be rescheduled, FALSE otherwise
850static gboolean
851check_connect_finished(gpointer userdata)
852{
853 struct tcp_async_cb_data *cb_data = userdata;
854 int rc;
855
856 fd_set rset, wset;
857 struct timeval ts = { 0, };
858
859 if (cb_data->start == 0) {
860 // Last connect() returned success immediately
861 rc = pcmk_rc_ok;
862 goto dispatch_done;
863 }
864
865 // If the socket is ready for reading or writing, the connect succeeded
866 FD_ZERO(&rset);
867 FD_SET(cb_data->sock, &rset);
868 wset = rset;
869 rc = select(cb_data->sock + 1, &rset, &wset, NULL, &ts);
870
871 if (rc < 0) { // select() error
872 rc = errno;
873 if ((rc == EINPROGRESS) || (rc == EAGAIN)) {
874 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
875 return TRUE; // There is time left, so reschedule timer
876 } else {
877 rc = ETIMEDOUT;
878 }
879 }
880 crm_trace("Could not check socket %d for connection success: %s (%d)",
881 cb_data->sock, pcmk_rc_str(rc), rc);
882
883 } else if (rc == 0) { // select() timeout
884 if ((time(NULL) - cb_data->start) < (cb_data->timeout_ms / 1000)) {
885 return TRUE; // There is time left, so reschedule timer
886 }
887 crm_debug("Timed out while waiting for socket %d connection success",
888 cb_data->sock);
889 rc = ETIMEDOUT;
890
891 // select() returned number of file descriptors that are ready
892
893 } else if (FD_ISSET(cb_data->sock, &rset)
894 || FD_ISSET(cb_data->sock, &wset)) {
895
896 // The socket is ready; check it for connection errors
897 int error = 0;
898 socklen_t len = sizeof(error);
899
900 if (getsockopt(cb_data->sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
901 rc = errno;
902 crm_trace("Couldn't check socket %d for connection errors: %s (%d)",
903 cb_data->sock, pcmk_rc_str(rc), rc);
904 } else if (error != 0) {
905 rc = error;
906 crm_trace("Socket %d connected with error: %s (%d)",
907 cb_data->sock, pcmk_rc_str(rc), rc);
908 } else {
909 rc = pcmk_rc_ok;
910 }
911
912 } else { // Should not be possible
913 crm_trace("select() succeeded, but socket %d not in resulting "
914 "read/write sets", cb_data->sock);
915 rc = EAGAIN;
916 }
917
918 dispatch_done:
919 if (rc == pcmk_rc_ok) {
920 crm_trace("Socket %d is connected", cb_data->sock);
921 } else {
922 close(cb_data->sock);
923 cb_data->sock = -1;
924 }
925
926 if (cb_data->callback) {
927 cb_data->callback(cb_data->userdata, rc, cb_data->sock);
928 }
929 free(cb_data);
930 return FALSE; // Do not reschedule timer
931}
932
951static int
952connect_socket_retry(int sock, const struct sockaddr *addr, socklen_t addrlen,
953 int timeout_ms, int *timer_id, void *userdata,
954 void (*callback) (void *userdata, int rc, int sock))
955{
956 int rc = 0;
957 int interval = 500;
958 int timer;
959 struct tcp_async_cb_data *cb_data = NULL;
960
961 rc = pcmk__set_nonblocking(sock);
962 if (rc != pcmk_rc_ok) {
963 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
964 pcmk_rc_str(rc), rc);
965 return rc;
966 }
967
968 rc = connect(sock, addr, addrlen);
969 if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
970 rc = errno;
971 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
972 pcmk_rc_str(rc), rc);
973 return rc;
974 }
975
976 cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
977 cb_data->userdata = userdata;
978 cb_data->callback = callback;
979 cb_data->sock = sock;
980 cb_data->timeout_ms = timeout_ms;
981
982 if (rc == 0) {
983 /* The connect was successful immediately, we still return to mainloop
984 * and let this callback get called later. This avoids the user of this api
985 * to have to account for the fact the callback could be invoked within this
986 * function before returning. */
987 cb_data->start = 0;
988 interval = 1;
989 } else {
990 cb_data->start = time(NULL);
991 }
992
993 /* This timer function does a non-blocking poll on the socket to see if we
994 * can use it. Once we can, the connect has completed. This method allows us
995 * to connect without blocking the mainloop.
996 *
997 * @TODO Use a mainloop fd callback for this instead of polling. Something
998 * about the way mainloop is currently polling prevents this from
999 * working at the moment though. (See connect(2) regarding EINPROGRESS
1000 * for possible new handling needed.)
1001 */
1002 crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
1003 interval, sock);
1004 timer = g_timeout_add(interval, check_connect_finished, cb_data);
1005 if (timer_id) {
1006 *timer_id = timer;
1007 }
1008
1009 // timer callback should be taking care of cb_data
1010 // cppcheck-suppress memleak
1011 return pcmk_rc_ok;
1012}
1013
1024static int
1025connect_socket_once(int sock, const struct sockaddr *addr, socklen_t addrlen)
1026{
1027 int rc = connect(sock, addr, addrlen);
1028
1029 if (rc < 0) {
1030 rc = errno;
1031 crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
1032 pcmk_rc_str(rc), rc);
1033 return rc;
1034 }
1035
1036 rc = pcmk__set_nonblocking(sock);
1037 if (rc != pcmk_rc_ok) {
1038 crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1039 pcmk_rc_str(rc), rc);
1040 return rc;
1041 }
1042
1043 return pcmk_ok;
1044}
1045
1062int
1063pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id,
1064 int *sock_fd, void *userdata,
1065 void (*callback) (void *userdata, int rc, int sock))
1066{
1067 char buffer[INET6_ADDRSTRLEN];
1068 struct addrinfo *res = NULL;
1069 struct addrinfo *rp = NULL;
1070 struct addrinfo hints;
1071 const char *server = host;
1072 int rc;
1073 int sock = -1;
1074
1075 CRM_CHECK((host != NULL) && (sock_fd != NULL), return EINVAL);
1076
1077 // Get host's IP address(es)
1078 memset(&hints, 0, sizeof(struct addrinfo));
1079 hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
1080 hints.ai_socktype = SOCK_STREAM;
1081 hints.ai_flags = AI_CANONNAME;
1082 rc = getaddrinfo(server, NULL, &hints, &res);
1083 if (rc != 0) {
1084 crm_err("Unable to get IP address info for %s: %s",
1085 server, gai_strerror(rc));
1086 rc = ENOTCONN;
1087 goto async_cleanup;
1088 }
1089 if (!res || !res->ai_addr) {
1090 crm_err("Unable to get IP address info for %s: no result", server);
1091 rc = ENOTCONN;
1092 goto async_cleanup;
1093 }
1094
1095 // getaddrinfo() returns a list of host's addresses, try them in order
1096 for (rp = res; rp != NULL; rp = rp->ai_next) {
1097 struct sockaddr *addr = rp->ai_addr;
1098
1099 if (!addr) {
1100 continue;
1101 }
1102
1103 if (rp->ai_canonname) {
1104 server = res->ai_canonname;
1105 }
1106 crm_debug("Got canonical name %s for %s", server, host);
1107
1108 sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
1109 if (sock == -1) {
1110 rc = errno;
1111 crm_warn("Could not create socket for remote connection to %s:%d: "
1112 "%s " CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
1113 continue;
1114 }
1115
1116 /* Set port appropriately for address family */
1117 /* (void*) casts avoid false-positive compiler alignment warnings */
1118 if (addr->sa_family == AF_INET6) {
1119 ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
1120 } else {
1121 ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
1122 }
1123
1124 memset(buffer, 0, PCMK__NELEM(buffer));
1125 pcmk__sockaddr2str(addr, buffer);
1126 crm_info("Attempting remote connection to %s:%d", buffer, port);
1127
1128 if (callback) {
1129 if (connect_socket_retry(sock, rp->ai_addr, rp->ai_addrlen, timeout,
1130 timer_id, userdata, callback) == pcmk_rc_ok) {
1131 goto async_cleanup; /* Success for now, we'll hear back later in the callback */
1132 }
1133
1134 } else if (connect_socket_once(sock, rp->ai_addr,
1135 rp->ai_addrlen) == pcmk_rc_ok) {
1136 break; /* Success */
1137 }
1138
1139 // Connect failed
1140 close(sock);
1141 sock = -1;
1142 rc = ENOTCONN;
1143 }
1144
1145async_cleanup:
1146
1147 if (res) {
1148 freeaddrinfo(res);
1149 }
1150 *sock_fd = sock;
1151 return rc;
1152}
1153
1165void
1166pcmk__sockaddr2str(const void *sa, char *s)
1167{
1168 switch (((const struct sockaddr *) sa)->sa_family) {
1169 case AF_INET:
1170 inet_ntop(AF_INET, &(((const struct sockaddr_in *) sa)->sin_addr),
1171 s, INET6_ADDRSTRLEN);
1172 break;
1173
1174 case AF_INET6:
1175 inet_ntop(AF_INET6,
1176 &(((const struct sockaddr_in6 *) sa)->sin6_addr),
1177 s, INET6_ADDRSTRLEN);
1178 break;
1179
1180 default:
1181 strcpy(s, "<invalid>");
1182 }
1183}
1184
1194int
1196{
1197 int rc;
1198 struct sockaddr_storage addr;
1199 socklen_t laddr = sizeof(addr);
1200 char addr_str[INET6_ADDRSTRLEN];
1201
1202 /* accept the connection */
1203 memset(&addr, 0, sizeof(addr));
1204 *csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
1205 if (*csock == -1) {
1206 rc = errno;
1207 crm_err("Could not accept remote client connection: %s "
1208 CRM_XS " rc=%d", pcmk_rc_str(rc), rc);
1209 return rc;
1210 }
1211 pcmk__sockaddr2str(&addr, addr_str);
1212 crm_info("Accepted new remote client connection from %s", addr_str);
1213
1214 rc = pcmk__set_nonblocking(*csock);
1215 if (rc != pcmk_rc_ok) {
1216 crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1217 pcmk_rc_str(rc), rc);
1218 close(*csock);
1219 *csock = -1;
1220 return rc;
1221 }
1222
1223#ifdef TCP_USER_TIMEOUT
1224 if (pcmk__get_sbd_timeout() > 0) {
1225 // Time to fail and retry before watchdog
1226 unsigned int optval = (unsigned int) pcmk__get_sbd_timeout() / 2;
1227
1228 rc = setsockopt(*csock, SOL_TCP, TCP_USER_TIMEOUT,
1229 &optval, sizeof(optval));
1230 if (rc < 0) {
1231 rc = errno;
1232 crm_err("Could not set TCP timeout to %d ms on remote connection: "
1233 "%s " CRM_XS " rc=%d", optval, pcmk_rc_str(rc), rc);
1234 close(*csock);
1235 *csock = -1;
1236 return rc;
1237 }
1238 }
1239#endif
1240
1241 return rc;
1242}
1243
1249int
1251{
1252 static int port = 0;
1253
1254 if (port == 0) {
1255 const char *env = getenv("PCMK_remote_port");
1256
1257 if (env) {
1258 errno = 0;
1259 port = strtol(env, NULL, 10);
1260 if (errno || (port < 1) || (port > 65535)) {
1261 crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1262 env, DEFAULT_REMOTE_PORT);
1263 port = DEFAULT_REMOTE_PORT;
1264 }
1265 } else {
1266 port = DEFAULT_REMOTE_PORT;
1267 }
1268 }
1269 return port;
1270}
void gnutls_session_t
Definition: cib_remote.c:42
#define PCMK__NELEM(a)
Definition: internal.h:41
struct tcp_async_cb_data __attribute__
#define ENDIAN_LOCAL
Definition: remote.c:70
#define __swab32(x)
Definition: remote.c:52
uint32_t size_total
Definition: remote.c:4
uint32_t endian
Definition: remote.c:0
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:633
uint32_t payload_uncompressed
Definition: remote.c:7
#define REMOTE_MSG_VERSION
Definition: remote.c:69
int pcmk__remote_send_xml(pcmk__remote_t *remote, xmlNode *msg)
Definition: remote.c:488
uint32_t payload_compressed
Definition: remote.c:6
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1250
uint64_t id
Definition: remote.c:2
uint64_t flags
Definition: remote.c:3
int pcmk__accept_remote_connection(int ssock, int *csock)
Definition: remote.c:1195
#define __swab64(x)
Definition: remote.c:58
uint32_t version
Definition: remote.c:1
void pcmk__sockaddr2str(const void *sa, char *s)
Definition: remote.c:1166
uint32_t payload_offset
Definition: remote.c:5
xmlNode * pcmk__remote_message_xml(pcmk__remote_t *remote)
Definition: remote.c:540
int pcmk__connect_remote(const char *host, int port, int timeout, int *timer_id, int *sock_fd, void *userdata, void(*callback)(void *userdata, int rc, int sock))
Definition: remote.c:1063
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
Definition: remote.c:791
char * crm_strdup_printf(char const *format,...) G_GNUC_PRINTF(1
#define PCMK_GNUTLS_PRIORITIES
Definition: config.h:526
pcmk__cpg_host_t host
Definition: cpg.c:4
A dumping ground.
int pcmk__set_nonblocking(int fd)
Definition: io.c:518
#define crm_info(fmt, args...)
Definition: logging.h:362
#define crm_warn(fmt, args...)
Definition: logging.h:360
#define CRM_XS
Definition: logging.h:55
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:211
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:227
#define crm_debug(fmt, args...)
Definition: logging.h:364
#define crm_err(fmt, args...)
Definition: logging.h:359
#define crm_trace(fmt, args...)
Definition: logging.h:365
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:51
Wrappers for and extensions to glib mainloop.
long pcmk__get_sbd_timeout(void)
Definition: watchdog.c:240
unsigned int timeout
Definition: pcmk_fence.c:32
#define ETIME
Definition: portability.h:150
const char * bz2_strerror(int rc)
Definition: results.c:823
const char * pcmk_strerror(int rc)
Definition: results.c:148
#define CRM_ASSERT(expr)
Definition: results.h:42
const char * pcmk_rc_str(int rc)
Get a user-friendly description of a return code.
Definition: results.c:476
@ pcmk_rc_ok
Definition: results.h:148
#define pcmk_ok
Definition: results.h:68
int pcmk_legacy2rc(int legacy_rc)
Definition: results.c:534
int pcmk__scan_min_int(const char *text, int *result, int minimum)
Definition: strings.c:127
struct pcmk__remote_s * remote
Definition: ipc_internal.h:185
size_t buffer_offset
Definition: ipc_internal.h:109
size_t buffer_size
Definition: ipc_internal.h:108
Wrappers for and extensions to libxml2.
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:2121
xmlNode * string2xml(const char *input)
Definition: xml.c:930