On Wed, May 13, 2020 at 9:16 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > On Wed, May 13, 2020 at 4:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > > > 4. > > > > +static void > > > > +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) > > > > +{ > > > > + LogicalDecodingContext *ctx = cache->private_data; > > > > + LogicalErrorCallbackState state; > > > > + ErrorContextCallback errcallback; > > > > + > > > > + Assert(!ctx->fast_forward); > > > > + > > > > + /* We're only supposed to call this when streaming is supported. */ > > > > + Assert(ctx->streaming); > > > > + > > > > + /* Push callback + info on the error context stack */ > > > > + state.ctx = ctx; > > > > + state.callback_name = "stream_start"; > > > > + /* state.report_location = apply_lsn; */ > > > > > > > > Why can't we supply the report_location here? I think here we need to > > > > report txn->first_lsn if this is the very first stream and > > > > txn->final_lsn if it is any consecutive one. > > > > > > I am not sure about this, Because for the very first stream we will > > > report the location of the first lsn of the stream and for the > > > consecutive stream we will report the last lsn in the stream. > > > > > > > Yeah, that doesn't seem to be consistent. How about if get it as an > > additional parameter? The caller can pass the lsn of the very first > > change it is trying to decode in this stream. > > Hmm, I think we need to call ReorderBufferIterTXNInit and > ReorderBufferIterTXNNext and get the first change of the stream after > that we shall call stream start then we can find out the first LSN of > the stream. I will see how to do so that it doesn't look awkward. > Basically, as of now, our code is of this layout. > > 1. stream_start; > 2. ReorderBufferIterTXNInit(rb, txn, &iterstate); > while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) > { > stream changes > } > 3. stream stop > > So if we want to know the first lsn of this stream then we shall do > something like this > > 1. ReorderBufferIterTXNInit(rb, txn, &iterstate); > while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) > { > 2. if first_change > stream_start; > > stream changes > } > 3. stream stop >
Yeah, something like that would work. I think you need to see it is first change for 'streaming' mode. > > > > > > > > 11. > > > > - * HeapTupleSatisfiesHistoricMVCC. > > > > + * tqual.c's HeapTupleSatisfiesHistoricMVCC. > > > > + * > > > > + * We do build the hash table even if there are no CIDs. That's > > > > + * because when streaming in-progress transactions we may run into > > > > + * tuples with the CID before actually decoding them. Think e.g. about > > > > + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded > > > > + * yet when applying the INSERT. So we build a hash table so that > > > > + * ResolveCminCmaxDuringDecoding does not segfault in this case. > > > > + * > > > > + * XXX We might limit this behavior to streaming mode, and just bail > > > > + * out when decoding transaction at commit time (at which point it's > > > > + * guaranteed to see all CIDs). > > > > */ > > > > static void > > > > ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN > > > > *txn) > > > > @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer > > > > *rb, ReorderBufferTXN *txn) > > > > dlist_iter iter; > > > > HASHCTL hash_ctl; > > > > > > > > - if (!rbtxn_has_catalog_changes(txn) || > > > > dlist_is_empty(&txn->tuplecids)) > > > > - return; > > > > - > > > > > > > > I don't understand this change. Why would "INSERT followed by > > > > TRUNCATE" could lead to a tuple which can come for decode before its > > > > CID? > > > > > > Actually, even if we haven't decoded the DDL operation but in the > > > actual system table the tuple might have been deleted from the next > > > operation. e.g. while we are streaming the INSERT it is possible that > > > the truncate has already deleted that tuple and set the max for the > > > tuple. So before streaming patch, we were only streaming the INSERT > > > only on commit so by that time we had got all the operation which has > > > done DDL and we would have already prepared tuple CID hash. > > > > > > > Okay, but I think for that case how good is that we always allow CID > > hash table to be built even if there are no catalog changes in TXN > > (see changes in ReorderBufferBuildTupleCidHash). Can't we detect that > > while resolving the cmin/cmax? > > Maybe in ResolveCminCmaxDuringDecoding we can see if tuplecid_data is > NULL then we can return as unresolved and then caller can take a call > based on that. > Yeah, and add appropriate comments about why we are doing so and in what kind of scenario that can happen. -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com