Dear Euler, Thanks for updating the patch!
>v24-0003: as I said I don't think we need to add it, however, I won't fight >against it if people want to add this check. OK, let's wait comments from senior members. >Since I applied v24-0004, I realized that extra start / stop service are >required. It mean pg_createsubscriber doesn't start the transformation with the >current standby settings. Instead, it stops the standby if it is running and >start it with the provided command-line options (socket, port, >listen_addresses). It has a few drawbacks: >* See v34-0012. It cannot detect if the target server is a primary for another > server. It is documented. Yeah, It is a collateral damage. >* I also removed the check for standby is running. If the standby was stopped a > long time ago, it will take some time to reach the start point. >* Dry run mode has to start / stop the service to work correctly. Is it an > issue? One concern (see below comment) is that -l option would not be passed even if the standby has been logging before running pg_createsubscriber. Also, some settings passed by pg_ctl start -o .... would not be restored. >However, I decided to include --retain option, I'm thinking about to remove it. >If the logging is enabled, the information during the pg_createsubscriber will >be available. The client log can be redirected to a file for future inspection. Just to confirm - you meant to say like below, right? * the client output would be redirected, and * -r option would be removed. Here are my initial comments for v25-0001. I read new doc and looks very good. I may do reviewing more about v25-0001, but feel free to revise. 01. cleanup_objects_atexit ``` PGconn *conn; int i; ``` The declaration *conn can be in the for-loop. Also, the declaration of the indicator can be in the bracket. 02. cleanup_objects_atexit ``` /* * If a connection could not be established, inform the user * that some objects were left on primary and should be * removed before trying again. */ if (dbinfo[i].made_publication) { pg_log_warning("There might be a publication \"%s\" in database \"%s\" on primary", dbinfo[i].pubname, dbinfo[i].dbname); pg_log_warning_hint("Consider dropping this publication before trying again."); } if (dbinfo[i].made_replslot) { pg_log_warning("There might be a replication slot \"%s\" in database \"%s\" on primary", dbinfo[i].subname, dbinfo[i].dbname); pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); } ``` Not sure which is better, but we may able to the list to the concrete file like pg_upgrade. (I thought it had been already discussed, but could not find from the archive. Sorry if it was a duplicated comment) 03. main ``` while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v", long_options, &option_index)) != -1) ``` Missing update for __shortopts. 04. main ``` case 'D': opt.subscriber_dir = pg_strdup(optarg); canonicalize_path(opt.subscriber_dir); break; ... case 'P': opt.pub_conninfo_str = pg_strdup(optarg); break; ... case 's': opt.socket_dir = pg_strdup(optarg); break; ... case 'U': opt.sub_username = pg_strdup(optarg); break; ``` Should we consider the case these options would be specified twice? I.e., should we call pg_free() before the substitution? 05. main Missing canonicalize_path() to the socket_dir. 06. main ``` /* * If socket directory is not provided, use the current directory. */ ``` One-line comment can be used. Period can be also removed at that time. 07. main ``` /* * * If subscriber username is not provided, check if the environment * variable sets it. If not, obtain the operating system name of the user * running it. */ ``` Unnecessary blank. 08. main ``` char *errstr = NULL; ``` This declaration can be at else-part. 09. main. Also, as the first place, do we have to get username if not specified? I felt libpq can handle the case if we skip passing the info. 10. main ``` appendPQExpBuffer(sub_conninfo_str, "host=%s port=%u user=%s fallback_application_name=%s", opt.socket_dir, opt.sub_port, opt.sub_username, progname); sub_base_conninfo = get_base_conninfo(sub_conninfo_str->data, NULL); ``` Is it really needed to call get_base_conninfo? I think no need to define sub_base_conninfo. 11. main ``` /* * In dry run mode, the server is restarted with the provided command-line * options so validation can be applied in the target server. In order to * preserve the initial state of the server (running), start it without * the command-line options. */ if (dry_run) start_standby_server(&opt, pg_ctl_path, NULL, false); ``` I think initial state of the server may be stopped. Now both conditions are allowed. And I think it is not good not to specify the logfile. 12. others As Peter E pointed out [1], the main function is still huge. It has more than 400 lines. I think all functions should have less than 100 line to keep the readability. I considered separation idea like below. Note that this may require to change orderings. How do you think? * add parse_command_options() which accepts user options and verifies them * add verification_phase() or something which checks system identifier and calls check_XXX * add catchup_phase() or something which creates a temporary slot, writes recovery parameters, and wait until the end of recovery * add cleanup_phase() or something which removes primary_slot and modifies the system identifier * stop/start server can be combined into one wrapper. Attached txt file is proofs the concept. 13. others PQresultStatus(res) is called 17 times in this source code, it may be redundant. I think we can introduce a function like executeQueryOrDie() and gather in one place. 14. others I found that pg_createsubscriber does not refer functions declared in other files. Is there a possibility to use them, e.g., streamutils.h? 15. others While reading the old discussions [2], Amit suggested to keep the comment and avoid creating a temporary slot. You said "Got it" but temp slot still exists. Is there any reason? Can you clarify your opinion? 16. others While reading [2] and [3], I was confused the decision. You and Amit discussed the combination with pg_createsubscriber and slot sync and how should handle slots on the physical standby. You seemed to agree to remove such a slot, and Amit also suggested to raise an ERROR. However, you said in [8] that such handlings is not mandatory so should raise an WARNING in dry_run. I was quite confused. Am I missing something? 17. others Per discussion around [4], we might have to consider an if the some options like data_directory and config_file was initially specified for standby server. Another easy approach is to allow users to specify options like -o in pg_upgrade [5], which is similar to your idea. Thought? 18. others How do you handle the reported failure [6]? 19. main ``` char *pub_base_conninfo = NULL; char *sub_base_conninfo = NULL; char *dbname_conninfo = NULL; ``` No need to initialize pub_base_conninfo and sub_base_conninfo. These variables would not be free'd. 20. others IIUC, slot creations would not be finished if there are prepared transactions. Should we detect it on the verification phase and raise an ERROR? 21. others As I said in [7], the catch up would not be finished if long recovery_min_apply_delay is used. Should we overwrite during the catch up? 22. pg_createsubscriber.sgml ``` <para> Check Write recovery parameters into the target data... ``` Not sure, but "Check" seems not needed. [1]: https://www.postgresql.org/message-id/b9aa614c-84ba-a869-582f-8d5e3ab57424%40enterprisedb.com [2]: https://www.postgresql.org/message-id/9fd3018d-0e5f-4507-aee6-efabfb5a4440%40app.fastmail.com [3]: https://www.postgresql.org/message-id/CAA4eK1L%2BE-bdKaOMSw-yWizcuprKMyeejyOwWjq_57%3DUqh-f%2Bg%40mail.gmail.com [4]: https://www.postgresql.org/message-id/TYCPR01MB12077B63D81B49E9DFD323661F55A2%40TYCPR01MB12077.jpnprd01.prod.outlook.com [5]: https://www.postgresql.org/docs/devel/pgupgrade.html#:~:text=options%20to%20be%20passed%20directly%20to%20the%20old%20postgres%20command%3B%20multiple%20option%20invocations%20are%20appended [6]: https://www.postgresql.org/message-id/CAHv8Rj%2B5mzK9Jt%2B7ECogJzfm5czvDCCd5jO1_rCx0bTEYpBE5g%40mail.gmail.com [7]: https://www.postgresql.org/message-id/OS3PR01MB98828B15DD9502C91E0C50D7F57D2%40OS3PR01MB9882.jpnprd01.prod.outlook.com [8]: https://www.postgresql.org/message-id/be92c57b-82e1-4920-ac31-a8a04206db7b%40app.fastmail.com Best Regards, Hayato Kuroda FUJITSU LIMITED https://www.fujitsu.com/global/
From 5866926dd581881af6b75c41e858125f9427b4e6 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda <kuroda.hay...@fujitsu.com> Date: Wed, 6 Mar 2024 06:58:48 +0000 Subject: [PATCH] Shorten main function --- src/bin/pg_basebackup/pg_createsubscriber.c | 516 +++++++++++--------- 1 file changed, 281 insertions(+), 235 deletions(-) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index e70fc5dca0..80d76a78ce 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -70,8 +70,7 @@ static PGconn *connect_database(const char *conninfo, bool exit_on_error); static void disconnect_database(PGconn *conn, bool exit_on_error); static uint64 get_primary_sysid(const char *conninfo); static uint64 get_standby_sysid(const char *datadir); -static void modify_subscriber_sysid(const char *pg_resetwal_path, - struct CreateSubscriberOptions *opt); +static void modify_subscriber_sysid(struct CreateSubscriberOptions *opt); static bool server_is_in_recovery(PGconn *conn); static void check_publisher(struct LogicalRepInfo *dbinfo); static void setup_publisher(struct LogicalRepInfo *dbinfo); @@ -86,10 +85,12 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, static char *setup_server_logfile(const char *datadir); static void pg_ctl_status(const char *pg_ctl_cmd, int rc); static void start_standby_server(struct CreateSubscriberOptions *opt, - const char *pg_ctl_path, const char *logfile, + const char *logfile, bool with_options); -static void stop_standby_server(const char *pg_ctl_path, const char *datadir); -static void wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, +static void stop_standby_server(const char *datadir); +static void restart_server(struct CreateSubscriberOptions *options, + const char *logfile) +static void wait_for_end_recovery(const char *conninfo, struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); @@ -97,11 +98,20 @@ static void create_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); static void set_replication_progress(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *lsn); static void enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); +static void parse_command_option(int argc, char **argv, + struct CreateSubscriberOptions *options); +static void verification_phase(struct CreateSubscriberOptions *options); +static char *catchup_phase(struct CreateSubscriberOptions *options, + char *server_start_log); +static void cleanup_phase(struct CreateSubscriberOptions *options, + char *server_start_log); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ static const char *progname; +static const char *pg_ctl_path; +static const char *pg_resetwal_path; static char *primary_slot_name = NULL; static bool dry_run = false; @@ -521,7 +531,7 @@ get_standby_sysid(const char *datadir) * files from one of the systems might be used in the other one. */ static void -modify_subscriber_sysid(const char *pg_resetwal_path, struct CreateSubscriberOptions *opt) +modify_subscriber_sysid(struct CreateSubscriberOptions *opt) { ControlFileData *cf; bool crc_ok; @@ -1163,8 +1173,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc) } static void -start_standby_server(struct CreateSubscriberOptions *opt, const char *pg_ctl_path, - const char *logfile, bool with_options) +start_standby_server(struct CreateSubscriberOptions *opt, const char *logfile, + bool with_options) { PQExpBuffer pg_ctl_cmd = createPQExpBuffer(); char socket_string[MAXPGPATH + 200]; @@ -1210,7 +1220,7 @@ start_standby_server(struct CreateSubscriberOptions *opt, const char *pg_ctl_pat } static void -stop_standby_server(const char *pg_ctl_path, const char *datadir) +stop_standby_server(const char *datadir) { char *pg_ctl_cmd; int rc; @@ -1223,6 +1233,25 @@ stop_standby_server(const char *pg_ctl_path, const char *datadir) pg_log_info("server was stopped"); } +/* + * Wrapper for stop_standby_server() and start_standby_server() + */ +static void +restart_server(struct CreateSubscriberOptions *options, const char *logfile) +{ + struct stat statbuf; + char pidfile[MAXPGPATH]; + + /* Subscriber PID file */ + snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", options->subscriber_dir); + + /* If the standby server is running, stop it */ + if (stat(pidfile, &statbuf) == 0) + stop_standby_server(options->subscriber_dir); + + start_standby_server(options, logfile, true); +} + /* * Returns after the server finishes the recovery process. * @@ -1230,7 +1259,7 @@ stop_standby_server(const char *pg_ctl_path, const char *datadir) * the recovery process. By default, it waits forever. */ static void -wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, +wait_for_end_recovery(const char *conninfo, struct CreateSubscriberOptions *opt) { PGconn *conn; @@ -1272,7 +1301,7 @@ wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, { if (++count > NUM_CONN_ATTEMPTS) { - stop_standby_server(pg_ctl_path, opt->subscriber_dir); + stop_standby_server(opt->subscriber_dir); pg_log_error("standby server disconnected from the primary"); break; } @@ -1285,7 +1314,7 @@ wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, /* Bail out after recovery_timeout seconds if this option is set */ if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout) { - stop_standby_server(pg_ctl_path, opt->subscriber_dir); + stop_standby_server(opt->subscriber_dir); pg_log_error("recovery timed out"); disconnect_database(conn, true); } @@ -1581,165 +1610,20 @@ enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo) destroyPQExpBuffer(str); } -int -main(int argc, char **argv) +/* + * Verify the input arguments are appropriate. + */ +static void +verify_input_arguments(struct CreateSubscriberOptions *options) { - static struct option long_options[] = - { - {"database", required_argument, NULL, 'd'}, - {"pgdata", required_argument, NULL, 'D'}, - {"dry-run", no_argument, NULL, 'n'}, - {"subscriber-port", required_argument, NULL, 'p'}, - {"publisher-server", required_argument, NULL, 'P'}, - {"retain", no_argument, NULL, 'r'}, - {"socket-directory", required_argument, NULL, 's'}, - {"recovery-timeout", required_argument, NULL, 't'}, - {"subscriber-username", required_argument, NULL, 'U'}, - {"verbose", no_argument, NULL, 'v'}, - {"version", no_argument, NULL, 'V'}, - {"help", no_argument, NULL, '?'}, - {NULL, 0, NULL, 0} - }; - - struct CreateSubscriberOptions opt = {0}; - - int c; - int option_index; - - char *pg_ctl_path = NULL; - char *pg_resetwal_path = NULL; - - char *server_start_log; - - char *pub_base_conninfo = NULL; - char *sub_base_conninfo = NULL; char *dbname_conninfo = NULL; - - uint64 pub_sysid; - uint64 sub_sysid; - struct stat statbuf; - - PGconn *conn; - char *consistent_lsn; - - PQExpBuffer sub_conninfo_str = createPQExpBuffer(); - PQExpBuffer recoveryconfcontents = NULL; - - char pidfile[MAXPGPATH]; - - pg_logging_init(argv[0]); - pg_logging_set_level(PG_LOG_WARNING); - progname = get_progname(argv[0]); - set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber")); - - if (argc > 1) - { - if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) - { - usage(); - exit(0); - } - else if (strcmp(argv[1], "-V") == 0 - || strcmp(argv[1], "--version") == 0) - { - puts("pg_createsubscriber (PostgreSQL) " PG_VERSION); - exit(0); - } - } - - /* Default settings */ - opt.subscriber_dir = NULL; - opt.pub_conninfo_str = NULL; - opt.socket_dir = NULL; - opt.sub_port = DEFAULT_SUB_PORT; - opt.sub_username = NULL; - opt.database_names = (SimpleStringList) - { - NULL, NULL - }; - opt.retain = false; - opt.recovery_timeout = 0; - - /* - * Don't allow it to be run as root. It uses pg_ctl which does not allow - * it either. - */ -#ifndef WIN32 - if (geteuid() == 0) - { - pg_log_error("cannot be executed by \"root\""); - pg_log_error_hint("You must run %s as the PostgreSQL superuser.", - progname); - exit(1); - } -#endif - - get_restricted_token(); - - while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v", - long_options, &option_index)) != -1) - { - switch (c) - { - case 'd': - /* Ignore duplicated database names */ - if (!simple_string_list_member(&opt.database_names, optarg)) - { - simple_string_list_append(&opt.database_names, optarg); - num_dbs++; - } - break; - case 'D': - opt.subscriber_dir = pg_strdup(optarg); - canonicalize_path(opt.subscriber_dir); - break; - case 'n': - dry_run = true; - break; - case 'p': - if ((opt.sub_port = atoi(optarg)) <= 0) - pg_fatal("invalid subscriber port number"); - break; - case 'P': - opt.pub_conninfo_str = pg_strdup(optarg); - break; - case 'r': - opt.retain = true; - break; - case 's': - opt.socket_dir = pg_strdup(optarg); - break; - case 't': - opt.recovery_timeout = atoi(optarg); - break; - case 'U': - opt.sub_username = pg_strdup(optarg); - break; - case 'v': - pg_logging_increase_verbosity(); - break; - default: - /* getopt_long already emitted a complaint */ - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } - } - - /* - * Any non-option arguments? - */ - if (optind < argc) - { - pg_log_error("too many command-line arguments (first is \"%s\")", - argv[optind]); - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } + char *pub_base_conninfo; + PQExpBuffer sub_conninfo_str = createPQExpBuffer(); /* * Required arguments */ - if (opt.subscriber_dir == NULL) + if (options->subscriber_dir == NULL) { pg_log_error("no subscriber data directory specified"); pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -1749,14 +1633,14 @@ main(int argc, char **argv) /* * If socket directory is not provided, use the current directory. */ - if (opt.socket_dir == NULL) + if (options->socket_dir == NULL) { char cwd[MAXPGPATH]; if (!getcwd(cwd, MAXPGPATH)) pg_fatal("could not determine current directory"); - opt.socket_dir = pg_strdup(cwd); - canonicalize_path(opt.socket_dir); + options->socket_dir = pg_strdup(cwd); + canonicalize_path(options->socket_dir); } /* @@ -1765,17 +1649,17 @@ main(int argc, char **argv) * variable sets it. If not, obtain the operating system name of the user * running it. */ - if (opt.sub_username == NULL) + if (options->sub_username == NULL) { char *errstr = NULL; if (getenv("PGUSER")) { - opt.sub_username = getenv("PGUSER"); + options->sub_username = getenv("PGUSER"); } else { - opt.sub_username = get_user_name(&errstr); + options->sub_username = get_user_name(&errstr); if (errstr) pg_fatal("%s", errstr); } @@ -1785,7 +1669,7 @@ main(int argc, char **argv) * Parse connection string. Build a base connection string that might be * reused by multiple databases. */ - if (opt.pub_conninfo_str == NULL) + if (options->pub_conninfo_str == NULL) { /* * TODO use primary_conninfo (if available) from subscriber and @@ -1798,19 +1682,16 @@ main(int argc, char **argv) exit(1); } pg_log_info("validating connection string on publisher"); - pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str, + pub_base_conninfo = get_base_conninfo(options->pub_conninfo_str, &dbname_conninfo); if (pub_base_conninfo == NULL) exit(1); pg_log_info("validating connection string on subscriber"); appendPQExpBuffer(sub_conninfo_str, "host=%s port=%u user=%s fallback_application_name=%s", - opt.socket_dir, opt.sub_port, opt.sub_username, progname); - sub_base_conninfo = get_base_conninfo(sub_conninfo_str->data, NULL); - if (sub_base_conninfo == NULL) - exit(1); + options->socket_dir, options->sub_port, options->sub_username, progname); - if (opt.database_names.head == NULL) + if (options->database_names.head == NULL) { pg_log_info("no database was specified"); @@ -1821,7 +1702,7 @@ main(int argc, char **argv) */ if (dbname_conninfo) { - simple_string_list_append(&opt.database_names, dbname_conninfo); + simple_string_list_append(&options->database_names, dbname_conninfo); num_dbs++; pg_log_info("database \"%s\" was extracted from the publisher connection string", @@ -1836,58 +1717,134 @@ main(int argc, char **argv) } } - /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ - pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); - pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); - /* Rudimentary check for a data directory */ - check_data_directory(opt.subscriber_dir); + check_data_directory(options->subscriber_dir); /* * Store database information for publisher and subscriber. It should be * called before atexit() because its return is used in the * cleanup_objects_atexit(). */ - dbinfo = store_pub_sub_info(opt.database_names, pub_base_conninfo, - sub_base_conninfo); + dbinfo = store_pub_sub_info(options->database_names, pub_base_conninfo, + sub_conninfo_str->data); - /* Register a function to clean up objects in case of failure */ - atexit(cleanup_objects_atexit); + pfree(dbname_conninfo); + pfree(pub_base_conninfo); + destroyPQExpBuffer(sub_conninfo_str); +} - /* - * Check if the subscriber data directory has the same system identifier - * than the publisher data directory. - */ - pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); - sub_sysid = get_standby_sysid(opt.subscriber_dir); - if (pub_sysid != sub_sysid) - pg_fatal("subscriber data directory is not a copy of the source database cluster"); +/* + * Parse command-line options and store into CreateSubscriberOptions. + */ +static void +parse_command_option(int argc, char **argv, struct CreateSubscriberOptions *options) +{ + static struct option long_options[] = + { + {"database", required_argument, NULL, 'd'}, + {"pgdata", required_argument, NULL, 'D'}, + {"dry-run", no_argument, NULL, 'n'}, + {"subscriber-port", required_argument, NULL, 'p'}, + {"publisher-server", required_argument, NULL, 'P'}, + {"retain", no_argument, NULL, 'r'}, + {"socket-directory", required_argument, NULL, 's'}, + {"recovery-timeout", required_argument, NULL, 't'}, + {"subscriber-username", required_argument, NULL, 'U'}, + {"verbose", no_argument, NULL, 'v'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, + {NULL, 0, NULL, 0} + }; - /* Create the output directory to store any data generated by this tool */ - server_start_log = setup_server_logfile(opt.subscriber_dir); + int c; + int option_index; - /* Subscriber PID file */ - snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", opt.subscriber_dir); + get_restricted_token(); + + while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'd': + /* Ignore duplicated database names */ + if (!simple_string_list_member(&options->database_names, optarg)) + { + simple_string_list_append(&options->database_names, optarg); + num_dbs++; + } + break; + case 'D': + options->subscriber_dir = pg_strdup(optarg); + canonicalize_path(options->subscriber_dir); + break; + case 'n': + dry_run = true; + break; + case 'p': + if ((options->sub_port = atoi(optarg)) <= 0) + pg_fatal("invalid subscriber port number"); + break; + case 'P': + options->pub_conninfo_str = pg_strdup(optarg); + break; + case 'r': + options->retain = true; + break; + case 's': + options->socket_dir = pg_strdup(optarg); + break; + case 't': + options->recovery_timeout = atoi(optarg); + break; + case 'U': + options->sub_username = pg_strdup(optarg); + break; + case 'v': + pg_logging_increase_verbosity(); + break; + default: + /* getopt_long already emitted a complaint */ + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + } /* - * If the standby server is running, stop it. Some parameters (that can - * only be set at server start) are informed by command-line options. + * Any non-option arguments? */ - if (stat(pidfile, &statbuf) == 0) + if (optind < argc) { - - pg_log_info("standby is up and running"); - pg_log_info("stopping the server to start the transformation steps"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); } + verify_input_arguments(options); + + /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ + pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); + pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); +} + +/* + * Check whether nodes can be a logical replication cluster + */ +static void +verification_phase(struct CreateSubscriberOptions *options) +{ + uint64 pub_sysid; + uint64 sub_sysid; + /* - * Start a short-lived standby server with temporary parameters (provided - * by command-line options). The goal is to avoid connections during the - * transformation steps. + * Check if the subscriber data directory has the same system identifier + * than the publisher data directory. */ - pg_log_info("starting the standby with command-line options"); - start_standby_server(&opt, pg_ctl_path, server_start_log, true); + pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); + sub_sysid = get_standby_sysid(options->subscriber_dir); + if (pub_sysid != sub_sysid) + pg_fatal("subscriber data directory is not a copy of the source database cluster"); /* Check if the standby server is ready for logical replication */ check_subscriber(dbinfo); @@ -1899,14 +1856,17 @@ main(int argc, char **argv) * called after it. */ check_publisher(dbinfo); +} - /* - * Create the required objects for each database on publisher. This step - * is here mainly because if we stop the standby we cannot verify if the - * primary slot is in use. We could use an extra connection for it but it - * doesn't seem worth. - */ - setup_publisher(dbinfo); +/* + * Ensure the target server is caught up to the primary + */ +static char * +catchup_phase(struct CreateSubscriberOptions *options, char *server_start_log) +{ + PGconn *conn; + char *consistent_lsn; + PQExpBuffer recoveryconfcontents = NULL; /* * Create a temporary logical replication slot to get a consistent LSN. @@ -1959,7 +1919,7 @@ main(int argc, char **argv) { appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n", consistent_lsn); - WriteRecoveryConfig(conn, opt.subscriber_dir, recoveryconfcontents); + WriteRecoveryConfig(conn, options->subscriber_dir, recoveryconfcontents); } disconnect_database(conn, false); @@ -1970,20 +1930,18 @@ main(int argc, char **argv) * until accepting connections. */ pg_log_info("stopping and starting the subscriber"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); - start_standby_server(&opt, pg_ctl_path, server_start_log, true); + restart_server(options, server_start_log); /* Waiting the subscriber to be promoted */ - wait_for_end_recovery(dbinfo[0].subconninfo, pg_ctl_path, &opt); + wait_for_end_recovery(dbinfo[0].subconninfo, options); - /* - * Create the subscription for each database on subscriber. It does not - * enable it immediately because it needs to adjust the logical - * replication start point to the LSN reported by consistent_lsn (see - * set_replication_progress). It also cleans up publications created by - * this tool and replication to the standby. - */ - setup_subscriber(dbinfo, consistent_lsn); + return consistent_lsn; +} + +static void +cleanup_phase(struct CreateSubscriberOptions *options, char *server_start_log) +{ + PGconn *conn; /* * If the primary_slot_name exists on primary, drop it. @@ -2009,10 +1967,10 @@ main(int argc, char **argv) /* Stop the subscriber */ pg_log_info("stopping the subscriber"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); + stop_standby_server(options->subscriber_dir); /* Change system identifier from subscriber */ - modify_subscriber_sysid(pg_resetwal_path, &opt); + modify_subscriber_sysid(options); /* * In dry run mode, the server is restarted with the provided command-line @@ -2021,14 +1979,102 @@ main(int argc, char **argv) * the command-line options. */ if (dry_run) - start_standby_server(&opt, pg_ctl_path, NULL, false); + start_standby_server(options, NULL, false); /* * The log file is kept if retain option is specified or this tool does * not run successfully. Otherwise, log file is removed. */ - if (!dry_run && !opt.retain) + if (!dry_run && !options->retain) unlink(server_start_log); +} + +int +main(int argc, char **argv) +{ + struct CreateSubscriberOptions opt = {0}; + char *server_start_log; + char *consistent_lsn; + + pg_logging_init(argv[0]); + pg_logging_set_level(PG_LOG_WARNING); + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_createsubscriber (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + /* Default settings */ + opt.subscriber_dir = NULL; + opt.pub_conninfo_str = NULL; + opt.socket_dir = NULL; + opt.sub_port = DEFAULT_SUB_PORT; + opt.sub_username = NULL; + opt.database_names = (SimpleStringList) + { + NULL, NULL + }; + opt.retain = false; + opt.recovery_timeout = 0; + + /* + * Don't allow it to be run as root. It uses pg_ctl which does not allow + * it either. + */ +#ifndef WIN32 + if (geteuid() == 0) + { + pg_log_error("cannot be executed by \"root\""); + pg_log_error_hint("You must run %s as the PostgreSQL superuser.", + progname); + exit(1); + } +#endif + + parse_command_option(argc, argv, &opt); + + /* Create the output directory to store any data generated by this tool */ + server_start_log = setup_server_logfile(opt.subscriber_dir); + + restart_server(&opt, server_start_log); + + verification_phase(&opt); + + /* Register a function to clean up objects in case of failure */ + atexit(cleanup_objects_atexit); + + /* + * Create the required objects for each database on publisher. This step + * is here mainly because if we stop the standby we cannot verify if the + * primary slot is in use. We could use an extra connection for it but it + * doesn't seem worth. + */ + setup_publisher(dbinfo); + + consistent_lsn = catchup_phase(&opt, server_start_log); + + /* + * Create the subscription for each database on subscriber. It does not + * enable it immediately because it needs to adjust the logical + * replication start point to the LSN reported by consistent_lsn (see + * set_replication_progress). It also cleans up publications created by + * this tool and replication to the standby. + */ + setup_subscriber(dbinfo, consistent_lsn); + + cleanup_phase(&opt, server_start_log); success = true; -- 2.43.0