I was working on adding the tar streaming functionality we talked about at
the developer meeting to pg_basebackup, and rapidly ran across the issue
that Andres has been complaining about for a while. The code in
receivelog.c just passes an insane number of parameters around. Adding or
changing even a small thing ends up touching a huge number of places.
Here's an attempt to refactor the code to instead pass around a control
structure. I think it's a definite win already now, and we can't just keep
adding new functionality on top of the current one.
I'll proceed to work on the actual functionality I was working on to go on
top of this separately, but would appreciate a review of this part
independently. It's mostly mechanical, but there may definitely be mistakes
- or thinkos in the whole idea...
--
Magnus Hagander
Me: http://www.hagander.net/
Work: http://www.redpill-linpro.com/
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 372,381 **** typedef struct
static int
LogStreamerMain(logstreamer_param *param)
{
! if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
! param->sysidentifier, param->xlogdir,
! reached_end_position, standby_message_timeout,
! NULL, false, true))
/*
* Any errors will already have been reported in the function process,
--- 372,391 ----
static int
LogStreamerMain(logstreamer_param *param)
{
! StreamCtl stream;
!
! MemSet(&stream, sizeof(stream), 0);
! stream.startpos = param->startptr;
! stream.timeline = param->timeline;
! stream.sysidentifier = param->sysidentifier;
! stream.stream_stop = reached_end_position;
! stream.standby_message_timeout = standby_message_timeout;
! stream.synchronous = false;
! stream.mark_done = true;
! stream.basedir = param->xlogdir;
! stream.partial_suffix = NULL;
!
! if (!ReceiveXlogStream(param->bgconn, &stream))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 276,285 **** FindStreamingStart(uint32 *tli)
static void
StreamLog(void)
{
! XLogRecPtr startpos,
! serverpos;
! TimeLineID starttli,
! servertli;
/*
* Connect in replication mode to the server
--- 276,286 ----
static void
StreamLog(void)
{
! XLogRecPtr serverpos;
! TimeLineID servertli;
! StreamCtl stream;
!
! MemSet(&stream, 0, sizeof(stream));
/*
* Connect in replication mode to the server
***************
*** 311,327 **** StreamLog(void)
/*
* Figure out where to start streaming.
*/
! startpos = FindStreamingStart(&starttli);
! if (startpos == InvalidXLogRecPtr)
{
! startpos = serverpos;
! starttli = servertli;
}
/*
* Always start streaming at the beginning of a segment
*/
! startpos -= startpos % XLOG_SEG_SIZE;
/*
* Start the replication
--- 312,328 ----
/*
* Figure out where to start streaming.
*/
! stream.startpos = FindStreamingStart(&stream.timeline);
! if (stream.startpos == InvalidXLogRecPtr)
{
! stream.startpos = serverpos;
! stream.timeline = servertli;
}
/*
* Always start streaming at the beginning of a segment
*/
! stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
/*
* Start the replication
***************
*** 329,340 **** StreamLog(void)
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! progname, (uint32) (startpos >> 32), (uint32) startpos,
! starttli);
! ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial",
! synchronous, false);
PQfinish(conn);
conn = NULL;
--- 330,346 ----
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
! progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
! stream.timeline);
!
! stream.stream_stop = stop_streaming;
! stream.standby_message_timeout = standby_message_timeout;
! stream.synchronous = synchronous;
! stream.mark_done = false;
! stream.basedir = basedir;
! stream.partial_suffix = ".partial";
! ReceiveXlogStream(conn, &stream);
PQfinish(conn);
conn = NULL;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 33,59 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */
! static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
! uint32 timeline, char *basedir,
! stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool synchronous, bool mark_done);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr *blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool mark_done);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool mark_done);
! static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
! uint32 timeline, char *basedir,
! stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos,
! bool mark_done);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
--- 33,50 ----
static bool still_sending = true; /* feedback still needs to be sent? */
! static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
! XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
! static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! XLogRecPtr *blockpos);
! static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! XLogRecPtr blockpos, XLogRecPtr *stoppos);
! static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
***************
*** 99,106 **** mark_file_as_archived(const char *basedir, const char *fname)
* partial_suffix) is stored in current_walfile_name.
*/
static bool
! open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
! char *partial_suffix)
{
int f;
char fn[MAXPGPATH];
--- 90,96 ----
* partial_suffix) is stored in current_walfile_name.
*/
static bool
! open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{
int f;
char fn[MAXPGPATH];
***************
*** 110,119 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
! XLogFileName(current_walfile_name, timeline, segno);
! snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
! partial_suffix ? partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
--- 100,109 ----
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
! XLogFileName(current_walfile_name, stream->timeline, segno);
! snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
! stream->partial_suffix ? stream->partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
***************
*** 185,191 **** open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
{
off_t currpos;
--- 175,181 ----
* and returns false, otherwise returns true.
*/
static bool
! close_walfile(StreamCtl *stream, XLogRecPtr pos)
{
off_t currpos;
***************
*** 220,232 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
/*
* If we finished writing a .partial file, rename it into place.
*/
! if (currpos == XLOG_SEG_SIZE && partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
! snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
--- 210,222 ----
/*
* If we finished writing a .partial file, rename it into place.
*/
! if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
! snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
! snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
***************
*** 234,243 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
return false;
}
}
! else if (partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
! progname, current_walfile_name, partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
--- 224,233 ----
return false;
}
}
! else if (stream->partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
! progname, current_walfile_name, stream->partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
***************
*** 245,254 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
! if (currpos == XLOG_SEG_SIZE && mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(basedir, current_walfile_name))
return false;
}
--- 235,244 ----
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
! if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(stream->basedir, current_walfile_name))
return false;
}
***************
*** 261,267 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* Check if a timeline history file exists.
*/
static bool
! existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
--- 251,257 ----
* Check if a timeline history file exists.
*/
static bool
! existsTimeLineHistoryFile(StreamCtl *stream)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
***************
*** 271,282 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
! if (tli == 1)
return true;
! TLHistoryFileName(histfname, tli);
! snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
--- 261,272 ----
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
! if (stream->timeline == 1)
return true;
! TLHistoryFileName(histfname, stream->timeline);
! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
***************
*** 294,301 **** existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
! writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
! char *content, bool mark_done)
{
int size = strlen(content);
char path[MAXPGPATH];
--- 284,290 ----
}
static bool
! writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{
int size = strlen(content);
char path[MAXPGPATH];
***************
*** 307,321 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
! TLHistoryFileName(histfname, tli);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! progname, tli, filename);
return false;
}
! snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
/*
* Write into a temp file name.
--- 296,310 ----
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
! TLHistoryFileName(histfname, stream->timeline);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
! progname, stream->timeline, filename);
return false;
}
! snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
/*
* Write into a temp file name.
***************
*** 375,384 **** writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
}
/* Maintain archive_status, check close_walfile() for details. */
! if (mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(basedir, histfname))
return false;
}
--- 364,373 ----
}
/* Maintain archive_status, check close_walfile() for details. */
! if (stream->mark_done)
{
/* writes error message if failed */
! if (!mark_file_as_archived(stream->basedir, histfname))
return false;
}
***************
*** 498,508 **** CheckServerVersionForStreaming(PGconn *conn)
* Note: The log position *must* be at a log segment start!
*/
bool
! ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! char *sysidentifier, char *basedir,
! stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix,
! bool synchronous, bool mark_done)
{
char query[128];
char slotcmd[128];
--- 487,493 ----
* Note: The log position *must* be at a log segment start!
*/
bool
! ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
char query[128];
char slotcmd[128];
***************
*** 539,545 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
slotcmd[0] = 0;
}
! if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
--- 524,530 ----
slotcmd[0] = 0;
}
! if (stream->sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
***************
*** 559,565 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
! if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
--- 544,550 ----
PQclear(res);
return false;
}
! if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
***************
*** 567,577 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
! if (timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
! progname, timeline);
PQclear(res);
return false;
}
--- 552,562 ----
PQclear(res);
return false;
}
! if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
! progname, stream->timeline);
PQclear(res);
return false;
}
***************
*** 582,588 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
! lastFlushPosition = startpos;
while (1)
{
--- 567,573 ----
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
! lastFlushPosition = stream->startpos;
while (1)
{
***************
*** 590,598 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Fetch the timeline history file for this timeline, if we don't have
* it already.
*/
! if (!existsTimeLineHistoryFile(basedir, timeline))
{
! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
--- 575,583 ----
* Fetch the timeline history file for this timeline, if we don't have
* it already.
*/
! if (!existsTimeLineHistoryFile(stream))
{
! snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
***************
*** 615,624 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/* Write the history file to disk */
! writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
! PQgetvalue(res, 0, 1),
! mark_done);
PQclear(res);
}
--- 600,608 ----
}
/* Write the history file to disk */
! writeTimeLineHistoryFile(stream,
PQgetvalue(res, 0, 0),
! PQgetvalue(res, 0, 1));
PQclear(res);
}
***************
*** 627,640 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Before we start streaming from the requested location, check if the
* callback tells us to stop here.
*/
! if (stream_stop(startpos, timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
! (uint32) (startpos >> 32), (uint32) startpos,
! timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
--- 611,624 ----
* Before we start streaming from the requested location, check if the
* callback tells us to stop here.
*/
! if (stream->stream_stop(stream->startpos, stream->timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
! (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
! stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
***************
*** 646,654 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
/* Stream the WAL */
! res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
! standby_message_timeout, partial_suffix,
! &stoppos, synchronous, mark_done);
if (res == NULL)
goto error;
--- 630,636 ----
PQclear(res);
/* Stream the WAL */
! res = HandleCopyStream(conn, stream, &stoppos);
if (res == NULL)
goto error;
***************
*** 676,701 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
uint32 newtimeline;
bool parsed;
! parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
! if (newtimeline <= timeline)
{
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! progname, newtimeline, timeline);
goto error;
}
! if (startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
! timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
goto error;
}
--- 658,683 ----
uint32 newtimeline;
bool parsed;
! parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
! if (newtimeline <= stream->timeline)
{
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
! progname, newtimeline, stream->timeline);
goto error;
}
! if (stream->startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
! stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
! newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error;
}
***************
*** 715,722 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment.
*/
! timeline = newtimeline;
! startpos = startpos - (startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
--- 697,704 ----
* Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment.
*/
! stream->timeline = newtimeline;
! stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
***************
*** 729,735 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
! if (stream_stop(stoppos, timeline, false))
return true;
else
{
--- 711,717 ----
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
! if (stream->stream_stop(stoppos, stream->timeline, false))
return true;
else
{
***************
*** 810,823 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
* On any other sort of error, returns NULL.
*/
static PGresult *
! HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, bool synchronous, bool mark_done)
{
char *copybuf = NULL;
int64 last_status = -1;
! XLogRecPtr blockpos = startpos;
still_sending = true;
--- 792,803 ----
* On any other sort of error, returns NULL.
*/
static PGresult *
! HandleCopyStream(PGconn *conn, StreamCtl *stream,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
! XLogRecPtr blockpos = stream->startpos;
still_sending = true;
***************
*** 830,838 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos,
! mark_done))
goto error;
now = feGetCurrentTimestamp();
--- 810,816 ----
/*
* Check if we should continue streaming, or abort at this point.
*/
! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
now = feGetCurrentTimestamp();
***************
*** 841,847 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
! if (synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
--- 819,825 ----
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
! if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
***************
*** 863,871 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Potentially send a status message to the master
*/
! if (still_sending && standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
! standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
--- 841,849 ----
/*
* Potentially send a status message to the master
*/
! if (still_sending && stream->standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
! stream->standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
***************
*** 876,882 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Calculate how long send/receive loops should sleep
*/
! sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
last_status);
r = CopyStreamReceive(conn, sleeptime, ©buf);
--- 854,860 ----
/*
* Calculate how long send/receive loops should sleep
*/
! sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
r = CopyStreamReceive(conn, sleeptime, ©buf);
***************
*** 886,894 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
if (r == -2)
{
! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
! basedir, partial_suffix,
! stoppos, mark_done);
if (res == NULL)
goto error;
--- 864,870 ----
goto error;
if (r == -2)
{
! PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
if (res == NULL)
goto error;
***************
*** 905,922 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
else if (copybuf[0] == 'w')
{
! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
! timeline, basedir, stream_stop,
! partial_suffix, mark_done))
goto error;
/*
* Check if we should continue streaming, or abort at this
* point.
*/
! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
! stream_stop, partial_suffix, stoppos,
! mark_done))
goto error;
}
else
--- 881,894 ----
}
else if (copybuf[0] == 'w')
{
! if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
/*
* Check if we should continue streaming, or abort at this
* point.
*/
! if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
}
else
***************
*** 1113,1122 **** ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
* Process XLogData message.
*/
static bool
! ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
! XLogRecPtr *blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, bool mark_done)
{
int xlogoff;
int bytes_left;
--- 1085,1092 ----
* Process XLogData message.
*/
static bool
! ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
! XLogRecPtr *blockpos)
{
int xlogoff;
int bytes_left;
***************
*** 1196,1203 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
if (walfile == -1)
{
! if (!open_walfile(*blockpos, timeline,
! basedir, partial_suffix))
{
/* Error logged by open_walfile */
return false;
--- 1166,1172 ----
if (walfile == -1)
{
! if (!open_walfile(stream, *blockpos))
{
/* Error logged by open_walfile */
return false;
***************
*** 1224,1236 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
! if (still_sending && stream_stop(*blockpos, timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
--- 1193,1205 ----
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(stream, *blockpos))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
! if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
***************
*** 1252,1260 **** ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
* Handle end of the copy stream.
*/
static PGresult *
! HandleEndOfCopyStream(PGconn *conn, char *copybuf,
! XLogRecPtr blockpos, char *basedir, char *partial_suffix,
! XLogRecPtr *stoppos, bool mark_done)
{
PGresult *res = PQgetResult(conn);
--- 1221,1228 ----
* Handle end of the copy stream.
*/
static PGresult *
! HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
! XLogRecPtr blockpos, XLogRecPtr *stoppos)
{
PGresult *res = PQgetResult(conn);
***************
*** 1265,1271 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/
if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Error message written in close_walfile() */
PQclear(res);
--- 1233,1239 ----
*/
if (still_sending)
{
! if (!close_walfile(stream, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
***************
*** 1295,1307 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf,
* Check if we should continue streaming, or abort at this point.
*/
static bool
! CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
! char *basedir, stream_stop_callback stream_stop,
! char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
{
! if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{
/* Potential error message is written by close_walfile */
return false;
--- 1263,1274 ----
* Check if we should continue streaming, or abort at this point.
*/
static bool
! CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
! XLogRecPtr *stoppos)
{
! if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
{
! if (!close_walfile(stream, blockpos))
{
/* Potential error message is written by close_walfile */
return false;
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 22,37 ****
*/
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn,
! XLogRecPtr startpos,
! uint32 timeline,
! char *sysidentifier,
! char *basedir,
! stream_stop_callback stream_stop,
! int standby_message_timeout,
! char *partial_suffix,
! bool synchronous,
! bool mark_done);
#endif /* RECEIVELOG_H */
--- 22,49 ----
*/
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
+ /*
+ * Global parameters when receiving xlog stream
+ */
+ typedef struct
+ {
+ XLogRecPtr startpos;
+ TimeLineID timeline;
+ char *sysidentifier;
+ int standby_message_timeout;
+ bool synchronous;
+ bool mark_done;
+
+ stream_stop_callback stream_stop;
+
+ char *basedir;
+ char *partial_suffix;
+ } StreamCtl;
+
+
+
extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn,
! StreamCtl *stream);
#endif /* RECEIVELOG_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers