Hi, here is updated patch (details inline).

On 13/04/17 20:00, Petr Jelinek wrote:
> Thanks for looking at this!
> 
> On 13/04/17 02:29, Andres Freund wrote:
>> Hi,
>> On 2017-03-03 01:30:11 +0100, Petr Jelinek wrote:
>>
>>> From 7d5b48c8cb80e7c867b2096c999d08feda50b197 Mon Sep 17 00:00:00 2001
>>> From: Petr Jelinek <pjmo...@pjmodos.net>
>>> Date: Fri, 24 Feb 2017 21:39:03 +0100
>>> Subject: [PATCH 1/5] Reserve global xmin for create slot snasphot export
>>>
>>> Otherwise the VACUUM or pruning might remove tuples still needed by the
>>> exported snapshot.
>>> ---
>>>  src/backend/replication/logical/logical.c | 21 ++++++++++++++++++++-
>>>  1 file changed, 20 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/src/backend/replication/logical/logical.c 
>>> b/src/backend/replication/logical/logical.c
>>> index 5529ac8..57c392c 100644
>>> --- a/src/backend/replication/logical/logical.c
>>> +++ b/src/backend/replication/logical/logical.c
>>> @@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin,
>>>      * the slot machinery about the new limit. Once that's done the
>>>      * ProcArrayLock can be released as the slot machinery now is
>>>      * protecting against vacuum.
>>> +    *
>>> +    * Note that we only store the global xmin temporarily in the in-memory
>>> +    * state so that the initial snapshot can be exported. After initial
>>> +    * snapshot is done global xmin should be reset and not tracked anymore
>>> +    * so we are fine with losing the global xmin after crash.
>>>      * ----
>>>      */
>>>     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
>>>  
>>>     slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
>>>     slot->data.catalog_xmin = slot->effective_catalog_xmin;
>>> +   slot->effective_xmin = slot->effective_catalog_xmin;
>>
>>
>>>  void
>>>  FreeDecodingContext(LogicalDecodingContext *ctx)
>>>  {
>>> +   ReplicationSlot *slot = MyReplicationSlot;
>>> +
>>>     if (ctx->callbacks.shutdown_cb != NULL)
>>>             shutdown_cb_wrapper(ctx);
>>>  
>>> +   /*
>>> +    * Cleanup global xmin for the slot that we may have set in
>>> +    * CreateInitDecodingContext().
>>
>> Hm.  Is that actually a meaningful point to do so? For one, it gets
>> called by pg_logical_slot_get_changes_guts(), but more importantly, the
>> snapshot is exported till SnapBuildClearExportedSnapshot(), which is the
>> next command?  If we rely on the snapshot magic done by ExportSnapshot()
>> it'd be worthwhile to mention that...
>>
> 
> (Didn't see the patch for couple of months so don't remember all the
> detailed decisions anymore)
> 
> Yes we rely on backend's xmin to be set for exported snapshot. We only
> care about global xmin for exported snapshots really, I assumed that's
> clear enough from "so that the initial snapshot can be exported" but I
> guess there should be more clear comment about this where we actually
> clean this up.
> 

Okay, wrote new comment there, how is it now?

>>
>>> We do not take ProcArrayLock or similar
>>> +    * since we only reset xmin here and there's not much harm done by a
>>> +    * concurrent computation missing that.
>>> +    */
>>
>> Hum.  I was prepared to complain about this, but ISTM, that there's
>> absolutely no risk because the following
>> ReplicationSlotsComputeRequiredXmin(false); actually does all the
>> necessary locking?  But still, I don't see much point in the
>> optimization.
>>
> 
> Well, if we don't need it in LogicalConfirmReceivedLocation, I don't see
> why we need it here. Please enlighten me.
> 

I kept this as it was, after rereading, the
ReplicationSlotsComputeRequiredXmin(false) will do shared lock while if
we wanted to avoid mutex and do the xmin update under lock we'd need to
do exclusive lock so I think it's worth the optimization...

>>
>>
>>> This patch changes the code so that stored snapshots are only used for
>>> logical decoding restart but not for initial slot snapshot.
>>
>> Yea, that's a very good point...
>>
>>> @@ -1284,13 +1286,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, 
>>> XLogRecPtr lsn, xl_running_xacts *runn
>>>  
>>>             return false;
>>>     }
>>> -   /* c) valid on disk state */
>>> -   else if (SnapBuildRestore(builder, lsn))
>>> +   /* c) valid on disk state and not exported snapshot */
>>> +   else if (!TransactionIdIsNormal(builder->initial_xmin_horizon) &&
>>> +                    SnapBuildRestore(builder, lsn))
>>
>> Hm. Is this a good signaling mechanism? It'll also trigger for the SQL
>> interface, where it'd strictly speaking not be required, right?
>>
> 
> Good point. Maybe we should really tell snapshot builder if the snapshot
> is going to be exported or not explicitly (see the rant all the way down).
> 

I added the new signaling mechanism (the new boolean option indicating
if we are building full snapshot which is only set when the snapshot is
exported or used by the transaction).

>>
>>> From 3318a929e691870f3c1ca665bec3bfa8ea2af2a8 Mon Sep 17 00:00:00 2001
>>> From: Petr Jelinek <pjmo...@pjmodos.net>
>>> Date: Sun, 26 Feb 2017 01:07:33 +0100
>>> Subject: [PATCH 3/5] Prevent snapshot builder xmin from going backwards
>>
>> A bit more commentary would be good. What does that protect us against?
>>
> 
> I think I explained that in the email. We might export snapshot with
> xmin smaller than global xmin otherwise.
> 

Updated commit message with explanation as well.

>>
>>
>>> From 53193b40f26dd19c712f3b9b77af55f81eb31cc4 Mon Sep 17 00:00:00 2001
>>> From: Petr Jelinek <pjmo...@pjmodos.net>
>>> Date: Wed, 22 Feb 2017 00:57:33 +0100
>>> Subject: [PATCH 4/5] Fix xl_running_xacts usage in snapshot builder
>>>
>>> Due to race condition, the xl_running_xacts might contain no longer
>>> running transactions. Previous coding tried to get around this by
>>> additional locking but that did not work correctly for committs. Instead
>>> try combining decoded commits and multiple xl_running_xacts to get the
>>> consistent snapshot.
>>
>> Needs more explanation about approach.
>>
>>
>>> @@ -1221,7 +1221,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
>>> lsn, xl_running_xacts *runn
>>>      *        simply track the number of in-progress toplevel transactions 
>>> and
>>>      *        lower it whenever one commits or aborts. When that number
>>>      *        (builder->running.xcnt) reaches zero, we can go from 
>>> FULL_SNAPSHOT
>>> -    *        to CONSISTENT.
>>> +    *        to CONSISTENT. Sometimes we might get xl_running_xacts which 
>>> has
>>> +    *        all tracked transactions as finished. We'll need to restart 
>>> tracking
>>> +    *        in that case and use previously collected committed 
>>> transactions to
>>> +    *        purge transactions mistakenly marked as running in the
>>> +    *        xl_running_xacts which exist as a result of race condition in
>>> +    *        LogStandbySnapshot().
>>
>> I'm not following this yet.
>>
> 
> Let me try to explain:
> We get xl_running_xacts with txes 1,3,4. But the 1 already committed
> before so the decoding will never see it and we never get snapshot. Now
> at some point we might get xl_running_xact with txes 6,7,8 so we know
> that all transactions from the initial xl_running_xacts must be closed.
> We restart the tracking here from beginning as if this was first
> xl_running_xacts we've seen, with the exception that we look into past
> if we seen the 6,7,8 transactions already as finished, then we'll mark
> them as finished immediately (hence avoiding the issue with transaction
> 6 being already committed before xl_running_xacts was written).
> 
>>
>>> @@ -1298,11 +1303,17 @@ SnapBuildFindSnapshot(SnapBuild *builder, 
>>> XLogRecPtr lsn, xl_running_xacts *runn
>>>      * b) first encounter of a useable xl_running_xacts record. If we had
>>>      * found one earlier we would either track running transactions (i.e.
>>>      * builder->running.xcnt != 0) or be consistent (this function wouldn't
>>> -    * get called).
>>> +    * get called). However it's possible that we could not see all
>>> +    * transactions that were marked as running in xl_running_xacts, so if
>>> +    * we get new one that says all were closed but we are not consistent
>>> +    * yet, we need to restart the tracking while taking previously seen
>>> +    * transactions into account.
>>
>> This needs to revise the preceding comment more heavily.  "This is the
>> first!!! Or maybe not!" isn't easy to understand.
>>
> 
> Yeah, I found it bit hard to make it sound correct and not confusing,
> even wondered if I should split this code to two because of that but it
> would lead into quite a bit of code duplication, dunno if that's better.
> Maybe we could move the "reset" code into separate function to avoid
> most of the duplication.

Rewrote and moved this comment to it's own thing.

> 
>>
>>>      */
>>> -   else if (!builder->running.xcnt)
>>> +   else if (!builder->running.xcnt ||
>>> +                    running->oldestRunningXid > builder->running.xmax)
>>
>> Isn't that wrong under wraparound?
>>
> 
> Right, should use TransactionIdFollows.
> 

Fixed.

>>
>>>     {
>>>             int                     off;
>>> +           bool            first = builder->running.xcnt == 0;
>>>  
>>>             /*
>>>              * We only care about toplevel xids as those are the ones we
>>> @@ -1338,20 +1349,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, 
>>> XLogRecPtr lsn, xl_running_xacts *runn
>>>             builder->running.xmin = builder->running.xip[0];
>>>             builder->running.xmax = builder->running.xip[running->xcnt - 1];
>>>  
>>> +
>>>             /* makes comparisons cheaper later */
>>>             TransactionIdRetreat(builder->running.xmin);
>>>             TransactionIdAdvance(builder->running.xmax);
>>>  
>>>             builder->state = SNAPBUILD_FULL_SNAPSHOT;
>>>  
>>> -           ereport(LOG,
>>> -                   (errmsg("logical decoding found initial starting point 
>>> at %X/%X",
>>> -                                   (uint32) (lsn >> 32), (uint32) lsn),
>>> -                    errdetail_plural("%u transaction needs to finish.",
>>> -                                                     "%u transactions need 
>>> to finish.",
>>> -                                                     builder->running.xcnt,
>>> -                                                     (uint32) 
>>> builder->running.xcnt)));
>>> -
>>> +           /*
>>> +            * If this is the first time we've seen xl_running_xacts, we 
>>> are done.
>>> +            */
>>> +           if (first)
>>> +           {
>>> +                   ereport(LOG,
>>> +                           (errmsg("logical decoding found initial 
>>> starting point at %X/%X",
>>> +                                           (uint32) (lsn >> 32), (uint32) 
>>> lsn),
>>> +                            errdetail_plural("%u transaction needs to 
>>> finish.",
>>> +                                                             "%u 
>>> transactions need to finish.",
>>> +                                                             
>>> builder->running.xcnt,
>>> +                                                             (uint32) 
>>> builder->running.xcnt)));
>>> +           }
>>> +           else
>>> +           {
>>> +                   /*
>>> +                    * Because of the race condition in 
>>> LogStandbySnapshot() the
>>> +                    * transactions recorded in xl_running_xacts as running 
>>> might have
>>> +                    * already committed by the time the xl_running_xacts 
>>> was written
>>> +                    * to WAL. Use the information about decoded 
>>> transactions that we
>>> +                    * gathered so far to update our idea about what's 
>>> still running.
>>> +                    *
>>> +                    * We can use SnapBuildEndTxn directly as it only does 
>>> the
>>> +                    * transaction running check and handling without any 
>>> additional
>>> +                    * side effects.
>>> +                    */
>>> +                   for (off = 0; off < builder->committed.xcnt; off++)
>>> +                           SnapBuildEndTxn(builder, lsn, 
>>> builder->committed.xip[off]);
>>> +                   if (builder->state == SNAPBUILD_CONSISTENT)
>>> +                           return false;
>>> +
>>> +                   ereport(LOG,
>>> +                           (errmsg("logical decoding moved initial 
>>> starting point to %X/%X",
>>> +                                           (uint32) (lsn >> 32), (uint32) 
>>> lsn),
>>> +                            errdetail_plural("%u transaction needs to 
>>> finish.",
>>> +                                                             "%u 
>>> transactions need to finish.",
>>> +                                                             
>>> builder->running.xcnt,
>>> +                                                             (uint32) 
>>> builder->running.xcnt)));
>>> +           }
>>
>> Hm, this is not pretty.
>>
> 

Changed this whole thing to be 2 different code paths with common
function doing the common work.

> 
>>
>>> From 4217da872e9aa48750c020542d8bc22c863a3d75 Mon Sep 17 00:00:00 2001
>>> From: Petr Jelinek <pjmo...@pjmodos.net>
>>> Date: Tue, 21 Feb 2017 19:58:18 +0100
>>> Subject: [PATCH 5/5] Skip unnecessary snapshot builds
>>>
>>> When doing initial snapshot build during logical decoding
>>> initialization, don't build snapshots for transactions where we know the
>>> transaction didn't do any catalog changes. Otherwise we might end up
>>> with thousands of useless snapshots on busy server which can be quite
>>> expensive.
>>> ---
>>>  src/backend/replication/logical/snapbuild.c | 82 
>>> +++++++++++++++++++----------
>>>  1 file changed, 53 insertions(+), 29 deletions(-)
>>>
>>> diff --git a/src/backend/replication/logical/snapbuild.c 
>>> b/src/backend/replication/logical/snapbuild.c
>>> index 1a1c9ba..c800aa5 100644
>>> --- a/src/backend/replication/logical/snapbuild.c
>>> +++ b/src/backend/replication/logical/snapbuild.c
>>> @@ -954,6 +954,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
>>> TransactionId xid,
>>>     bool            forced_timetravel = false;
>>>     bool            sub_needs_timetravel = false;
>>>     bool            top_needs_timetravel = false;
>>> +   bool            skip_forced_snapshot = false;
>>>  
>>>     TransactionId xmax = xid;
>>>  
>>> @@ -975,10 +976,19 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr 
>>> lsn, TransactionId xid,
>>>             /*
>>>              * We could avoid treating !SnapBuildTxnIsRunning transactions 
>>> as
>>>              * timetravel ones, but we want to be able to export a snapshot 
>>> when
>>> -            * we reached consistency.
>>> +            * we reached consistency so we need to keep track of them.
>>>              */
>>>             forced_timetravel = true;
>>>             elog(DEBUG1, "forced to assume catalog changes for xid %u 
>>> because it was running too early", xid);
>>> +
>>> +           /*
>>> +            * It is however desirable to skip building new snapshot for
>>> +            * !SnapBuildTxnIsRunning transactions as otherwise we might 
>>> end up
>>> +            * building thousands of unused snapshots on busy servers which 
>>> can
>>> +            * be very expensive.
>>> +            */
>>> +           if (!SnapBuildTxnIsRunning(builder, xid))
>>> +                   skip_forced_snapshot = true;
>>>     }
>>
>> That's pretty crudely bolted on the existing logic, isn't there a
>> simpler way?
>>
> 
> Agreed, however, every time I tried to make this prettier I ended up
> either producing subtle bugs (see the initial email in this thread for
> example of that), so I eventually gave up on pretty.
> 

Okay, gave it one more try with fresh head, hopefully without new bugs,
what do you think?

> As a side note, my opinion after all this is that it's probably mistake
> to try to use various situational conditions to make sure we are
> building exportable snapshot. ISTM we should tell the snapbuilder
> explicitly that the snapshot will be exported and it should behave
> accordingly based on that. Because for example, we also should track
> aborted transactions in the snapshot which is to be exported because
> otherwise enough of them happening during snapshot building will result
> in export failure due to too big snapshot. But this seems like too
> invasive change to be back-portable.
> 

Ended up doing this in the 0002 and also use those changes in 0005, does
not seem to be that bad.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 073dfa48f2361b8ee6a656bcbe57d11cad4cc2b3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Fri, 24 Feb 2017 21:39:03 +0100
Subject: [PATCH 1/5] Reserve global xmin for create slot snasphot export

Otherwise the VACUUM or pruning might remove tuples still needed by the
exported snapshot.
---
 src/backend/replication/logical/logical.c | 27 ++++++++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 5529ac8..58e1c80 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -267,12 +267,18 @@ CreateInitDecodingContext(char *plugin,
         * the slot machinery about the new limit. Once that's done the
         * ProcArrayLock can be released as the slot machinery now is
         * protecting against vacuum.
+        *
+        * Note that we only store the global xmin temporarily in the in-memory
+        * state so that the initial snapshot can be exported. After initial
+        * snapshot is done global xmin should be reset and not tracked anymore
+        * so we are fine with losing the global xmin after crash.
         * ----
         */
        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
        slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
        slot->data.catalog_xmin = slot->effective_catalog_xmin;
+       slot->effective_xmin = slot->effective_catalog_xmin;
 
        ReplicationSlotsComputeRequiredXmin(true);
 
@@ -282,7 +288,7 @@ CreateInitDecodingContext(char *plugin,
         * tell the snapshot builder to only assemble snapshot once reaching the
         * running_xact's record with the respective xmin.
         */
-       xmin_horizon = slot->data.catalog_xmin;
+       xmin_horizon = slot->effective_xmin;
 
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
@@ -456,9 +462,28 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 void
 FreeDecodingContext(LogicalDecodingContext *ctx)
 {
+       ReplicationSlot *slot = MyReplicationSlot;
+
        if (ctx->callbacks.shutdown_cb != NULL)
                shutdown_cb_wrapper(ctx);
 
+       /*
+        * Cleanup global xmin for the slot that we may have set in
+        * CreateInitDecodingContext(). It's okay to do this here 
unconditionally
+        * because we only care for the global xmin for exported snapshots and 
if
+        * we exported one we used the required xmin for the current backend
+        * proccess in SnapBuildInitialSnapshot().
+        *
+        * We do not take ProcArrayLock or similar since we only reset xmin here
+        * and there's not much harm done by a concurrent computation missing
+        * that and ReplicationSlotsComputeRequiredXmin will do locking as
+        * neccessary.
+        */
+       SpinLockAcquire(&slot->mutex);
+       slot->effective_xmin = InvalidTransactionId;
+       SpinLockRelease(&slot->mutex);
+       ReplicationSlotsComputeRequiredXmin(false);
+
        ReorderBufferFree(ctx->reorder);
        FreeSnapshotBuilder(ctx->snapshot_builder);
        XLogReaderFree(ctx->reader);
-- 
2.7.4

From 163ae827097b5c2956fafa6c6884b58e3a53ae3b Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Tue, 21 Feb 2017 20:14:44 +0100
Subject: [PATCH 2/5] Don't use on disk snapshots for snapshot export in
 logical decoding

We store historical snapshots on disk to enable continuation of logical
decoding after restart. These snapshots were also used bu slot
initialiation code for initial snapshot that the slot exports to aid
synchronization of data copy and the stream consumption. However
these snapshots are only useful for catalogs and not for normal user
tables. So when we exported such snapshots for user to read data from
tables that is consistent with a specific LSN of slot creation, user
would instead read wrong data.

This patch changes the code so that stored snapshots are not used when
slot creation needs full snapshot.
---
 src/backend/replication/logical/logical.c   | 10 +++++++---
 src/backend/replication/logical/snapbuild.c | 19 +++++++++++++------
 src/backend/replication/slotfuncs.c         |  2 +-
 src/backend/replication/walsender.c         |  7 ++++++-
 src/include/replication/logical.h           |  1 +
 src/include/replication/snapbuild.h         |  3 ++-
 6 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/logical.c 
b/src/backend/replication/logical/logical.c
index 58e1c80..79c1dd7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -114,6 +114,7 @@ static LogicalDecodingContext *
 StartupDecodingContext(List *output_plugin_options,
                                           XLogRecPtr start_lsn,
                                           TransactionId xmin_horizon,
+                                          bool need_full_snapshot,
                                           XLogPageReadCB read_page,
                                           
LogicalOutputPluginWriterPrepareWrite prepare_write,
                                           LogicalOutputPluginWriterWrite 
do_write)
@@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options,
 
        ctx->reorder = ReorderBufferAllocate();
        ctx->snapshot_builder =
-               AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn);
+               AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
+                                                               
need_full_snapshot);
 
        ctx->reorder->private_data = ctx;
 
@@ -210,6 +212,7 @@ StartupDecodingContext(List *output_plugin_options,
 LogicalDecodingContext *
 CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
+                                                 bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  
LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  
LogicalOutputPluginWriterWrite do_write)
@@ -294,7 +297,8 @@ CreateInitDecodingContext(char *plugin,
        ReplicationSlotSave();
 
        ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
-                                                                read_page, 
prepare_write, do_write);
+                                                                
need_full_snapshot, read_page, prepare_write,
+                                                                do_write);
 
        /* call output plugin initialization callback */
        old_context = MemoryContextSwitchTo(ctx->context);
@@ -383,7 +387,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
        }
 
        ctx = StartupDecodingContext(output_plugin_options,
-                                                                start_lsn, 
InvalidTransactionId,
+                                                                start_lsn, 
InvalidTransactionId, false,
                                                                 read_page, 
prepare_write, do_write);
 
        /* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 2279604..ada618d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,9 @@ struct SnapBuild
         */
        TransactionId initial_xmin_horizon;
 
+       /* Indicates if we are building full snapshot or just catalog one .*/
+       bool            building_full_snapshot;
+
        /*
         * Snapshot that's valid to see the catalog state seen at this moment.
         */
@@ -281,7 +284,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr 
lsn);
 SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
                                                TransactionId xmin_horizon,
-                                               XLogRecPtr start_lsn)
+                                               XLogRecPtr start_lsn,
+                                               bool need_full_snapshot)
 {
        MemoryContext context;
        MemoryContext oldcontext;
@@ -308,6 +312,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 
        builder->initial_xmin_horizon = xmin_horizon;
        builder->start_decoding_at = start_lsn;
+       builder->building_full_snapshot = need_full_snapshot;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -1233,7 +1238,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *runn
         *
         * a) There were no running transactions when the xl_running_xacts 
record
         *        was inserted, jump to CONSISTENT immediately. We might find 
such a
-        *        state we were waiting for b) and c).
+        *        state we were waiting for b) or c).
         *
         * b) Wait for all toplevel transactions that were running to end. We
         *        simply track the number of in-progress toplevel transactions 
and
@@ -1248,7 +1253,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *runn
         *        at all.
         *
         * c) This (in a previous run) or another decoding slot serialized a
-        *        snapshot to disk that we can use.
+        *        snapshot to disk that we can use. We can't use this method 
for the
+        *        initial snapshot when slot is being created and needs full 
snapshot
+        *        for export or direct use.
         * ---
         */
 
@@ -1303,13 +1310,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
lsn, xl_running_xacts *runn
 
                return false;
        }
-       /* c) valid on disk state */
-       else if (SnapBuildRestore(builder, lsn))
+       /* c) valid on disk state and not full snapshot */
+       else if (!builder->building_full_snapshot &&
+                        SnapBuildRestore(builder, lsn))
        {
                /* there won't be any state to cleanup */
                return false;
        }
-
        /*
         * b) first encounter of a useable xl_running_xacts record. If we had
         * found one earlier we would either track running transactions (i.e.
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 7104c94..9775735 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -132,7 +132,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
         * Create logical decoding context, to build the initial snapshot.
         */
        ctx = CreateInitDecodingContext(
-                                                                       
NameStr(*plugin), NIL,
+                                                                       
NameStr(*plugin), NIL, false,
                                                                        
logical_read_local_xlog_page, NULL, NULL);
 
        /* build initial snapshot, might take a while */
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index dbb10c7..2784d67 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -873,6 +873,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
        if (cmd->kind == REPLICATION_KIND_LOGICAL)
        {
                LogicalDecodingContext *ctx;
+               bool    need_full_snapshot = false;
 
                /*
                 * Do options check early so that we can bail before calling the
@@ -884,6 +885,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                                ereport(ERROR,
                                                
(errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
                                                                "must not be 
called inside a transaction")));
+
+                       need_full_snapshot = true;
                }
                else if (snapshot_action == CRS_USE_SNAPSHOT)
                {
@@ -906,9 +909,11 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                                ereport(ERROR,
                                                
(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
                                                                "must not be 
called in a subtransaction")));
+
+                       need_full_snapshot = true;
                }
 
-               ctx = CreateInitDecodingContext(cmd->plugin, NIL,
+               ctx = CreateInitDecodingContext(cmd->plugin, NIL, 
need_full_snapshot,
                                                                                
logical_read_xlog_page,
                                                                                
WalSndPrepareWrite, WalSndWriteData);
 
diff --git a/src/include/replication/logical.h 
b/src/include/replication/logical.h
index 7d6c88e..80f04c3 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -82,6 +82,7 @@ extern void CheckLogicalDecodingRequirements(void);
 
 extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                                                  List *output_plugin_options,
+                                                 bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  
LogicalOutputPluginWriterPrepareWrite prepare_write,
                                                  
LogicalOutputPluginWriterWrite do_write);
diff --git a/src/include/replication/snapbuild.h 
b/src/include/replication/snapbuild.h
index a8ae631..494751d 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -54,7 +54,8 @@ struct xl_running_xacts;
 extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
-                                               TransactionId xmin_horizon, 
XLogRecPtr start_lsn);
+                                               TransactionId xmin_horizon, 
XLogRecPtr start_lsn,
+                                               bool need_full_snapshot);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
-- 
2.7.4

From ae60b52ae0ca96bc14169cd507f101fbb5dfdf52 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sun, 26 Feb 2017 01:07:33 +0100
Subject: [PATCH 3/5] Prevent snapshot builder xmin from going backwards

Logical decoding snapshot builder may encounter xl_running_xacts with
older xmin than the xmin of the builder. This can happen because
LogStandbySnapshot() sometimes sees already committed transactions as
running (there is difference between "running" in terms for WAL and in
terms of ProcArray). When this happens we must make sure that the xmin
of snapshot builder won't go back otherwise the resulting snapshot would
show some transaction as running even though they have already
committed.
---
 src/backend/replication/logical/snapbuild.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index ada618d..3e34f75 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1165,7 +1165,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, 
XLogRecPtr lsn, xl_running_xact
         * looking, it's correct and actually more efficient this way since we 
hit
         * fast paths in tqual.c.
         */
-       builder->xmin = running->oldestRunningXid;
+       if (TransactionIdFollowsOrEquals(running->oldestRunningXid, 
builder->xmin))
+               builder->xmin = running->oldestRunningXid;
 
        /* Remove transactions we don't need to keep track off anymore */
        SnapBuildPurgeCommittedTxn(builder);
-- 
2.7.4

From 1f9d3fe6f1fb9a9b39ea6bd9e1776a769fac8ea9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 22 Feb 2017 00:57:33 +0100
Subject: [PATCH 4/5] Fix xl_running_xacts usage in snapshot builder

Due to race condition, the xl_running_xacts might contain no longer
running transactions. Previous coding tried to get around this by
additional locking but that did not work correctly for committs. Instead
try combining decoded commits and multiple xl_running_xacts to get the
consistent snapshot.

This also reverts changes made to GetRunningTransactionData() and
LogStandbySnapshot() by b89e151 as the additional locking does not help.
---
 src/backend/replication/logical/snapbuild.c | 195 ++++++++++++++++++----------
 src/backend/storage/ipc/procarray.c         |   5 +-
 src/backend/storage/ipc/standby.c           |  19 ---
 3 files changed, 130 insertions(+), 89 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index 3e34f75..d989576 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1220,6 +1220,82 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, 
XLogRecPtr lsn, xl_running_xact
                                                                                
  builder->last_serialized_snapshot);
 }
 
+/*
+ * Start tracking transactions based on the info we get from xl_running_xacts.
+ */
+static void
+SnapBuildStartXactTracking(SnapBuild *builder, xl_running_xacts *running)
+{
+       int                     off;
+
+       /*
+        * We only care about toplevel xids as those are the ones we
+        * definitely see in the wal stream. As snapbuild.c tracks committed
+        * instead of running transactions we don't need to know anything
+        * about uncommitted subtransactions.
+        */
+
+       /*
+        * Start with an xmin/xmax that's correct for future, when all the
+        * currently running transactions have finished. We'll update both
+        * while waiting for the pending transactions to finish.
+        */
+       builder->xmin = running->nextXid;               /* < are finished */
+       builder->xmax = running->nextXid;               /* >= are running */
+
+       /* so we can safely use the faster comparisons */
+       Assert(TransactionIdIsNormal(builder->xmin));
+       Assert(TransactionIdIsNormal(builder->xmax));
+
+       builder->running.xcnt = running->xcnt;
+       builder->running.xcnt_space = running->xcnt;
+       builder->running.xip =
+               MemoryContextAlloc(builder->context,
+                                                  builder->running.xcnt * 
sizeof(TransactionId));
+       memcpy(builder->running.xip, running->xids,
+                  builder->running.xcnt * sizeof(TransactionId));
+
+       /* sort so we can do a binary search */
+       qsort(builder->running.xip, builder->running.xcnt,
+                 sizeof(TransactionId), xidComparator);
+
+       builder->running.xmin = builder->running.xip[0];
+       builder->running.xmax = builder->running.xip[running->xcnt - 1];
+
+
+       /* makes comparisons cheaper later */
+       TransactionIdRetreat(builder->running.xmin);
+       TransactionIdAdvance(builder->running.xmax);
+
+       builder->state = SNAPBUILD_FULL_SNAPSHOT;
+
+       /*
+        * Iterate through all xids, wait for them to finish.
+        *
+        * This isn't required for the correctness of decoding, but to allow
+        * isolationtester to notice that we're currently waiting for
+        * something.
+        */
+       for (off = 0; off < builder->running.xcnt; off++)
+       {
+               TransactionId xid = builder->running.xip[off];
+
+               /*
+                * Upper layers should prevent that we ever need to wait on
+                * ourselves. Check anyway, since failing to do so would either
+                * result in an endless wait or an Assert() failure.
+                */
+               if (TransactionIdIsCurrentTransactionId(xid))
+                       elog(ERROR, "waiting for ourselves");
+
+               /*
+                * This isn't required for the correctness of decoding, but to 
allow
+                * isolationtester to notice that we're currently waiting for
+                * something.
+                */
+               XactLockTableWait(xid, NULL, NULL, XLTW_None);
+       }
+}
 
 /*
  * Build the start of a snapshot that's capable of decoding the catalog.
@@ -1241,7 +1317,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
lsn, xl_running_xacts *runn
         *        was inserted, jump to CONSISTENT immediately. We might find 
such a
         *        state we were waiting for b) or c).
         *
-        * b) Wait for all toplevel transactions that were running to end. We
+        * b) This (in a previous run) or another decoding slot serialized a
+        *        snapshot to disk that we can use. We can't use this method 
for the
+        *        initial snapshot when slot is being created and needs full 
snapshot
+        *        for export or direct use.
+
+        * c) Wait for all toplevel transactions that were running to end. We
         *        simply track the number of in-progress toplevel transactions 
and
         *        lower it whenever one commits or aborts. When that number
         *        (builder->running.xcnt) reaches zero, we can go from 
FULL_SNAPSHOT
@@ -1252,11 +1333,6 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
lsn, xl_running_xacts *runn
         *        Interestingly, in contrast to HS, this allows us not to care 
about
         *        subtransactions - and by extension suboverflowed 
xl_running_xacts -
         *        at all.
-        *
-        * c) This (in a previous run) or another decoding slot serialized a
-        *        snapshot to disk that we can use. We can't use this method 
for the
-        *        initial snapshot when slot is being created and needs full 
snapshot
-        *        for export or direct use.
         * ---
         */
 
@@ -1311,7 +1387,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, 
xl_running_xacts *runn
 
                return false;
        }
-       /* c) valid on disk state and not full snapshot */
+       /* b) valid on disk state and not full snapshot */
        else if (!builder->building_full_snapshot &&
                         SnapBuildRestore(builder, lsn))
        {
@@ -1319,54 +1395,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
lsn, xl_running_xacts *runn
                return false;
        }
        /*
-        * b) first encounter of a useable xl_running_xacts record. If we had
+        * c) first encounter of a useable xl_running_xacts record. If we had
         * found one earlier we would either track running transactions (i.e.
         * builder->running.xcnt != 0) or be consistent (this function wouldn't
         * get called).
         */
        else if (!builder->running.xcnt)
        {
-               int                     off;
-
-               /*
-                * We only care about toplevel xids as those are the ones we
-                * definitely see in the wal stream. As snapbuild.c tracks 
committed
-                * instead of running transactions we don't need to know 
anything
-                * about uncommitted subtransactions.
-                */
-
-               /*
-                * Start with an xmin/xmax that's correct for future, when all 
the
-                * currently running transactions have finished. We'll update 
both
-                * while waiting for the pending transactions to finish.
-                */
-               builder->xmin = running->nextXid;               /* < are 
finished */
-               builder->xmax = running->nextXid;               /* >= are 
running */
-
-               /* so we can safely use the faster comparisons */
-               Assert(TransactionIdIsNormal(builder->xmin));
-               Assert(TransactionIdIsNormal(builder->xmax));
-
-               builder->running.xcnt = running->xcnt;
-               builder->running.xcnt_space = running->xcnt;
-               builder->running.xip =
-                       MemoryContextAlloc(builder->context,
-                                                          
builder->running.xcnt * sizeof(TransactionId));
-               memcpy(builder->running.xip, running->xids,
-                          builder->running.xcnt * sizeof(TransactionId));
-
-               /* sort so we can do a binary search */
-               qsort(builder->running.xip, builder->running.xcnt,
-                         sizeof(TransactionId), xidComparator);
-
-               builder->running.xmin = builder->running.xip[0];
-               builder->running.xmax = builder->running.xip[running->xcnt - 1];
-
-               /* makes comparisons cheaper later */
-               TransactionIdRetreat(builder->running.xmin);
-               TransactionIdAdvance(builder->running.xmax);
-
-               builder->state = SNAPBUILD_FULL_SNAPSHOT;
+               SnapBuildStartXactTracking(builder, running);
 
                ereport(LOG,
                        (errmsg("logical decoding found initial starting point 
at %X/%X",
@@ -1376,30 +1412,53 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr 
lsn, xl_running_xacts *runn
                                                          builder->running.xcnt,
                                                          (uint32) 
builder->running.xcnt)));
 
+               /* nothing could have built up so far, so don't perform cleanup 
*/
+               return false;
+       }
+       /*
+        * c) we already seen the xl_running_xacts and tried to do the above.
+        * However because of race condition in LogStandbySnapshot() there might
+        * have been transaction reported as running but in reality has already
+        * written commit record before the xl_running_xacts so decoding has
+        * missed it. We now see xl_running_xacts that suggests all transactions
+        * from the original one were closed but the consistent state wasn't
+        * reached which means the race condition has indeed happened.
+        *
+        * Start tracking again as if this was the first xl_running_xacts we've
+        * seen, with the advantage that because decoding was already running,
+        * any transactions committed before the xl_running_xacts record will be
+        * known to us so we won't hit with the same issue again.
+        */
+       else if (TransactionIdFollows(running->oldestRunningXid,
+                                                                 
builder->running.xmax))
+       {
+               int off;
+
+               SnapBuildStartXactTracking(builder, running);
+
                /*
-                * Iterate through all xids, wait for them to finish.
+                * Nark any transactions that are known to have committed 
before the
+                * xl_running_xacts as finished to avoid the race condition in
+                * LogStandbySnapshot().
                 *
-                * This isn't required for the correctness of decoding, but to 
allow
-                * isolationtester to notice that we're currently waiting for
-                * something.
+                * We can use SnapBuildEndTxn directly as it only does the
+                * transaction running check and handling without any additional
+                * side effects.
                 */
-               for (off = 0; off < builder->running.xcnt; off++)
-               {
-                       TransactionId xid = builder->running.xip[off];
-
-                       /*
-                        * Upper layers should prevent that we ever need to 
wait on
-                        * ourselves. Check anyway, since failing to do so 
would either
-                        * result in an endless wait or an Assert() failure.
-                        */
-                       if (TransactionIdIsCurrentTransactionId(xid))
-                               elog(ERROR, "waiting for ourselves");
+               for (off = 0; off < builder->committed.xcnt; off++)
+                       SnapBuildEndTxn(builder, lsn, 
builder->committed.xip[off]);
 
-                       XactLockTableWait(xid, NULL, NULL, XLTW_None);
-               }
+               /* We might have reached consistent point now. */
+               if (builder->state == SNAPBUILD_CONSISTENT)
+                       return false;
 
-               /* nothing could have built up so far, so don't perform cleanup 
*/
-               return false;
+               ereport(LOG,
+                               (errmsg("logical decoding moved initial 
starting point to %X/%X",
+                                               (uint32) (lsn >> 32), (uint32) 
lsn),
+                                errdetail_plural("%u transaction needs to 
finish.",
+                                                                 "%u 
transactions need to finish.",
+                                                                 
builder->running.xcnt,
+                                                                 (uint32) 
builder->running.xcnt)));
        }
 
        /*
diff --git a/src/backend/storage/ipc/procarray.c 
b/src/backend/storage/ipc/procarray.c
index ebf6a92..b3d6829 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2060,12 +2060,13 @@ GetRunningTransactionData(void)
        CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
        CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
 
+       /* We don't release XidGenLock here, the caller is responsible for that 
*/
+       LWLockRelease(ProcArrayLock);
+
        Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
        Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
        Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
 
-       /* We don't release the locks here, the caller is responsible for that 
*/
-
        return CurrentRunningXacts;
 }
 
diff --git a/src/backend/storage/ipc/standby.c 
b/src/backend/storage/ipc/standby.c
index 8e57f93..ddb279e 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -929,27 +929,8 @@ LogStandbySnapshot(void)
         */
        running = GetRunningTransactionData();
 
-       /*
-        * GetRunningTransactionData() acquired ProcArrayLock, we must release 
it.
-        * For Hot Standby this can be done before inserting the WAL record
-        * because ProcArrayApplyRecoveryInfo() rechecks the commit status using
-        * the clog. For logical decoding, though, the lock can't be released
-        * early because the clog might be "in the future" from the POV of the
-        * historic snapshot. This would allow for situations where we're 
waiting
-        * for the end of a transaction listed in the xl_running_xacts record
-        * which, according to the WAL, has committed before the 
xl_running_xacts
-        * record. Fortunately this routine isn't executed frequently, and it's
-        * only a shared lock.
-        */
-       if (wal_level < WAL_LEVEL_LOGICAL)
-               LWLockRelease(ProcArrayLock);
-
        recptr = LogCurrentRunningXacts(running);
 
-       /* Release lock if we kept it longer ... */
-       if (wal_level >= WAL_LEVEL_LOGICAL)
-               LWLockRelease(ProcArrayLock);
-
        /* GetRunningTransactionData() acquired XidGenLock, we must release it 
*/
        LWLockRelease(XidGenLock);
 
-- 
2.7.4

From ff27b1fc7099fa668a9e28daa28bfee1ad9410bd Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Tue, 21 Feb 2017 19:58:18 +0100
Subject: [PATCH 5/5] Skip unnecessary snapshot builds

When doing initial snapshot build during logical decoding
initialization, don't build snapshots for transactions where we know the
transaction didn't do any catalog changes. Otherwise we might end up
with thousands of useless snapshots on busy server which can be quite
expensive.
---
 src/backend/replication/logical/snapbuild.c | 100 ++++++++++++++++------------
 1 file changed, 58 insertions(+), 42 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index d989576..916b297 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -975,9 +975,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
 {
        int                     nxact;
 
-       bool            forced_timetravel = false;
-       bool            sub_needs_timetravel = false;
-       bool            top_needs_timetravel = false;
+       bool            need_timetravel = false;
+       bool            need_snapshot = false;
 
        TransactionId xmax = xid;
 
@@ -997,12 +996,22 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                        builder->start_decoding_at = lsn + 1;
 
                /*
-                * We could avoid treating !SnapBuildTxnIsRunning transactions 
as
-                * timetravel ones, but we want to be able to export a snapshot 
when
-                * we reached consistency.
+                * When building full snapshot we need to keep track of all
+                * transactions.
                 */
-               forced_timetravel = true;
-               elog(DEBUG1, "forced to assume catalog changes for xid %u 
because it was running too early", xid);
+               if (builder->building_full_snapshot)
+               {
+                       need_timetravel = true;
+                       elog(DEBUG1, "forced to assume catalog changes for xid 
%u because it was running too early", xid);
+               }
+
+               /*
+                * If we could not observe the just finished transaction since 
it
+                * started (because it started before we started tracking), 
we'll
+                * always need a snapshot.
+                */
+               if (SnapBuildTxnIsRunning(builder, xid))
+                       need_snapshot = true;
        }
 
        for (nxact = 0; nxact < nsubxacts; nxact++)
@@ -1015,23 +1024,13 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                SnapBuildEndTxn(builder, lsn, subxid);
 
                /*
-                * If we're forcing timetravel we also need visibility 
information
-                * about subtransaction, so keep track of subtransaction's 
state.
-                */
-               if (forced_timetravel)
-               {
-                       SnapBuildAddCommittedTxn(builder, subxid);
-                       if (NormalTransactionIdFollows(subxid, xmax))
-                               xmax = subxid;
-               }
-
-               /*
                 * Add subtransaction to base snapshot if it DDL, we don't 
distinguish
                 * to toplevel transactions there.
                 */
-               else if (ReorderBufferXidHasCatalogChanges(builder->reorder, 
subxid))
+               if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
                {
-                       sub_needs_timetravel = true;
+                       need_timetravel = true;
+                       need_snapshot = true;
 
                        elog(DEBUG1, "found subtransaction %u:%u with catalog 
changes.",
                                 xid, subxid);
@@ -1041,6 +1040,17 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                        if (NormalTransactionIdFollows(subxid, xmax))
                                xmax = subxid;
                }
+               /*
+                * If we have already decided that timetravel is needed for this
+                * transaction, we also need visibility information about
+                * subtransaction, so keep track of subtransaction's state.
+                */
+               else if (need_timetravel)
+               {
+                       SnapBuildAddCommittedTxn(builder, subxid);
+                       if (NormalTransactionIdFollows(subxid, xmax))
+                               xmax = subxid;
+               }
        }
 
        /*
@@ -1049,29 +1059,27 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
         */
        SnapBuildEndTxn(builder, lsn, xid);
 
-       if (forced_timetravel)
-       {
-               elog(DEBUG2, "forced transaction %u to do timetravel.", xid);
-
-               SnapBuildAddCommittedTxn(builder, xid);
-       }
-       /* add toplevel transaction to base snapshot */
-       else if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+       /*
+        * Add toplevel transaction to base snapshot if it made any cataog
+        * changes...
+        */
+       if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
        {
                elog(DEBUG2, "found top level transaction %u, with catalog 
changes!",
                         xid);
 
-               top_needs_timetravel = true;
+               need_timetravel = true;
+               need_snapshot = true;
                SnapBuildAddCommittedTxn(builder, xid);
        }
-       else if (sub_needs_timetravel)
+       /* ... or if previous checks decided we need timetravel anyway. */
+       else if (need_timetravel)
        {
-               /* mark toplevel txn as timetravel as well */
                SnapBuildAddCommittedTxn(builder, xid);
        }
 
        /* if there's any reason to build a historic snapshot, do so now */
-       if (forced_timetravel || top_needs_timetravel || sub_needs_timetravel)
+       if (need_timetravel)
        {
                /*
                 * Adjust xmax of the snapshot builder, we only do that for 
committed,
@@ -1092,15 +1100,25 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
                        return;
 
+               /* We always need to build snapshot if there isn't one yet. */
+               need_snapshot = need_snapshot || !builder->snapshot;
+
                /*
-                * Decrease the snapshot builder's refcount of the old 
snapshot, note
-                * that it still will be used if it has been handed out to the
-                * reorderbuffer earlier.
+                * Decrease the snapshot builder's refcount of the old snapshot 
if we
+                * plan to build new one, note that it still will be used if it 
has
+                * been handed out to the reorderbuffer earlier.
                 */
-               if (builder->snapshot)
+               if (builder->snapshot && need_snapshot)
                        SnapBuildSnapDecRefcount(builder->snapshot);
 
-               builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+               /* Build new snapshot unless asked not to. */
+               if (need_snapshot)
+               {
+                       builder->snapshot = SnapBuildBuildSnapshot(builder, 
xid);
+
+                       /* refcount of the snapshot builder for the new 
snapshot */
+                       SnapBuildSnapIncRefcount(builder->snapshot);
+               }
 
                /* we might need to execute invalidations, add snapshot */
                if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1110,11 +1128,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
                                                                                
 builder->snapshot);
                }
 
-               /* refcount of the snapshot builder for the new snapshot */
-               SnapBuildSnapIncRefcount(builder->snapshot);
-
                /* add a new Snapshot to all currently running transactions */
-               SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+               if (need_snapshot)
+                       SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
        }
        else
        {
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to