Re: Rework LogicalOutputPluginWriterUpdateProgress

2024-01-15 Thread vignesh C
On Mon, 13 Mar 2023 at 08:17, wangw.f...@fujitsu.com
 wrote:
>
> On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 
>  wrote:
> > Hi,
> >
> >
> > On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 
> > wrote:
> > > Attach the new patch set.
> > Thanks for updating the patch ! One review comment on v7-0005.
>
> Thanks for your comment.
>
> > stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
> > threshold check and UpdateProgressAndKeepalive unlike other write wrapper
> > functions like below. But, both of them write some data to the output 
> > plugin, set
> > the flag of did_write and thus it updates the subscriber's 
> > last_recv_timestamp
> > used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair 
> > to
> > both functions can be more accurate, in order to reset the counter in
> > changes_count on the publisher ?
> >
> > @@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
> > ReorderBufferTXN *txn,
> >
> > /* Pop the error context stack */
> > error_context_stack = errcallback.previous;
> > +
> > +   /* No progress has been made, so don't call 
> > UpdateProgressAndKeepalive */
> >  }
>
> Since I think stream_start/stop_cp are different from change_cb, they don't
> represent records in wal, so I think the LSNs corresponding to these two
> messages are the LSNs of other records. So, we don't call the function
> UpdateProgressAndKeepalive here. Also, for the reasons described in [1].#05, I
> didn't reset the counter here.

As there has been no activity in this thread and it seems there is not
much interest on this from the last 9 months, I have changed the
status of the patch to "Returned with Feedback".

Regards,
Vignesh




RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-12 Thread wangw.f...@fujitsu.com
On Fri, Mar 10, 2023 20:17 PM Osumi, Takamichi/大墨 昂道 
 wrote:
> Hi,
> 
> 
> On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威 
> wrote:
> > Attach the new patch set.
> Thanks for updating the patch ! One review comment on v7-0005.

Thanks for your comment.

> stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of
> threshold check and UpdateProgressAndKeepalive unlike other write wrapper
> functions like below. But, both of them write some data to the output plugin, 
> set
> the flag of did_write and thus it updates the subscriber's last_recv_timestamp
> used for timeout check in LogicalRepApplyLoop. So, it looks adding the pair to
> both functions can be more accurate, in order to reset the counter in
> changes_count on the publisher ?
> 
> @@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> 
> /* Pop the error context stack */
> error_context_stack = errcallback.previous;
> +
> +   /* No progress has been made, so don't call 
> UpdateProgressAndKeepalive */
>  }

Since I think stream_start/stop_cp are different from change_cb, they don't
represent records in wal, so I think the LSNs corresponding to these two
messages are the LSNs of other records. So, we don't call the function
UpdateProgressAndKeepalive here. Also, for the reasons described in [1].#05, I
didn't reset the counter here.

[1] - 
https://www.postgresql.org/message-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread Takamichi Osumi (Fujitsu)
Hi,


On Friday, March 10, 2023 6:32 PM Wang, Wei/王 威  wrote:
> Attach the new patch set.
Thanks for updating the patch ! One review comment on v7-0005.

stream_start_cb_wrapper and stream_stop_cb_wrapper don't call the pair of 
threshold check and UpdateProgressAndKeepalive unlike other write wrapper 
functions like below. But, both of them write some data to the output plugin, 
set the flag of did_write and thus it updates the subscriber's 
last_recv_timestamp used for timeout check in LogicalRepApplyLoop. So, it looks 
adding the pair to both functions can be more accurate, in order to reset the 
counter in changes_count on the publisher ?

@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, 
ReorderBufferTXN *txn,

/* Pop the error context stack */
error_context_stack = errcallback.previous;
+
+   /* No progress has been made, so don't call UpdateProgressAndKeepalive 
*/
 }


Best Regards,
Takamichi Osumi



RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread wangw.f...@fujitsu.com
On Mon, Mar 10, 2023 14:35 PM Amit Kapila  wrote:
> On Fri, Mar 10, 2023 at 11:17 AM Peter Smith  wrote:
> >
> > On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila  wrote:
> > >
> > > On Thu, Mar 9, 2023 at 10:56 AM Peter Smith 
> wrote:
> > > >
> > > > 2. rollback_prepared_cb_wrapper
> > > >
> > > >   /*
> > > >   * If the plugin support two-phase commits then rollback prepared 
> > > > callback
> > > >   * is mandatory
> > > > + *
> > > > + * FIXME: This should have been caught much earlier.
> > > >   */
> > > >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> > > >   ereport(ERROR,
> > > >
> > > > ~
> > > >
> > > > Why is this seemingly unrelated FIXME still in the patch?
> > > >
> > >
> > > After reading this Fixme comment and the error message ("logical
> > > replication at prepare time requires a %s callback
> > > rollback_prepared_cb"), I think we can move this and a similar check
> > > in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> > > function. This is because there is no use of letting prepare pass when
> > > we can't do a rollback or commit prepared. What do you think?
> > >
> >
> > My first impression was it sounds like a good idea to catch the
> > missing callbacks early as you said.
> >
> > But if you decide to check for missing commit/rollback callbacks early
> > in prepare_cb_wrapper(), then won't you also want to have equivalent
> > checking done earlier for stream_prepare_cb_wrapper()?
> >
> 
> Yeah, probably or we can leave the lazy checking as it is. In the
> ideal case, we could check for the presence of all the callbacks in
> StartupDecodingContext() but we delay it to find the missing methods
> later. One possibility is that we check for any missing method in
> StartupDecodingContext() if any one of prepare/streaming calls are
> present but not sure if that is any better than the current
> arrangement.
> 
> > And then it quickly becomes a slippery slope to question many other things:
> > - Why allow startup_cb if shutdown_cb is missing?
> >
> 
> I am not sure if there is a hard dependency between these two but
> their callers do check for Null before invoking those.
> 
> > - Why allow change_cb if commit_cb or rollback_cb is missing?
> 
> We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
> have rollback_cb() defined at all?
> 
> > - Why allow filter_prepare_cb if prepare_cb is missing?
> >
> 
> I am not so sure about this but If prepare gets filtered, we don't
> need to invoke prepare_cb.
> 
> > - etc.
> >
> > ~
> >
> > So I am wondering if the HEAD code lazy-check of the callback only at
> > the point where it is needed was actually a deliberate design choice
> > just to be simpler - e.g. we don't need to be so concerned about any
> > other callback dependencies.
> >
> 
> Yeah, changing that probably needs some more thought. I have mentioned
> one of the possibilities above.

I think this approach looks fine to me. So, I wrote a separate patch (0006) for
discussing and reviewing this approach.

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread wangw.f...@fujitsu.com
On Thur, Mar 9, 2023 13:26 PM Peter Smith  wrote:
> Here are some review comments for v6-0001

Thanks for your comments.

> ==
> General.
> 
> 1.
> There are lots of new comments saying:
> /* don't call update progress, we didn't really make any */
> 
> but is the wording "call update progress" meaningful?
> 
> Should that be written something more like:
> /* No progress has been made so there is no need to call
> UpdateProgressAndKeepalive. */

Changed.
Shortened your suggested comment using a grammar tool. So, the modified comment
looks like this:
```
No progress has been made, so don't call UpdateProgressAndKeepalive
```

> ~~~
> 
> 4.
> 
> @@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> 
>   /* Pop the error context stack */
>   error_context_stack = errcallback.previous;
> +
> + UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
>  }
> 
> ~
> 
> Are the double parentheses necessary?

I think the code looks clearer this way.

> ==
> src/backend/replication/walsender.c
> 
> 6. WalSndUpdateProgressAndKeepalive
> 
> Since the 'ctx' is unused here, it might be nicer to annotate that to
> make it clear it is deliberate and suppress any possible warnings
> about unused params.
> 
> e.g. something like:
> 
> WalSndUpdateProgressAndKeepalive(
> pg_attribute_unused() LogicalDecodingContext *ctx,
> XLogRecPtr lsn,
> TransactionId xid,
> bool did_write,
> bool finished_xact)

Because many functions don't use this approach, I’m not sure what the rules are
for using it in PG. And I think that we should discuss this on a separate thread
to check which similar functions need this kind of modification in PG source
code.

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread wangw.f...@fujitsu.com
On Wed, Mar 8, 2023 23:55 PM Osumi, Takamichi/大墨 昂道 
 wrote:
> Hi,
> 
> 
> On Wednesday, March 8, 2023 11:54 AM From: wangw.f...@fujitsu.com
>  wrote:
> > Attach the new patch.
> Thanks for sharing v6 ! Few minor comments for the same.

Thanks for your comments.

> (1) commit message
> 
> The old function name 'is_skip_threshold_change' is referred currently. We 
> need
> to update it to 'is_keepalive_threshold_exceeded' I think.

Fixed.

> (2) OutputPluginPrepareWrite
> 
> @@ -662,7 +656,8 @@ void
>  OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
>  {
> if (!ctx->accept_writes)
> -   elog(ERROR, "writes are only accepted in commit, begin and 
> change
> callbacks");
> +   elog(ERROR, "writes are only accepted in output plugin 
> callbacks, "
> +"except startup, shutdown, filter_by_origin, and 
> filter_prepare.");
> 
> We can remove the period at the end of error string.

Removed.

> (3) is_keepalive_threshold_exceeded's comments
> 
> +/*
> + * Helper function to check whether a large number of changes have been
> skipped
> + * continuously.
> + */
> +static bool
> +is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)
> 
> I suggest to update the comment slightly something like below.
> From:
> ...whether a large number of changes have been skipped continuously
> To:
> ...whether a large number of changes have been skipped without being sent to
> the output plugin continuously

Make sense.
Also, I slightly corrected the original function comment with a grammar check
tool. So, the modified comment looks like this:
```
Helper function to check for continuous skipping of many changes without sending
them to the output plugin.
```

> (4) term for 'keepalive'
> 
> +/*
> + * Update progress tracking and send keep alive (if required).
> + */
> 
> The 'keep alive' might be better to be replaced with 'keepalive', which looks
> commonest in other source codes. In the current patch, there are 3 different
> ways to express it (the other one is 'keep-alive') and it would be better to 
> unify
> the term, at least within the same patch ?

Yes, agree.
Unified the comment you mentioned here ('keep alive') and the comment in the
commit message ('keep-alive') as 'keepalive'.

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread wangw.f...@fujitsu.com
On Wed, Mar 8, 2023 19:06 PM Kuroda, Hayato/黒田 隼人  
wrote:
> Dear Wang,

Thanks for your testing and comments.

> ---
> ```
> +/*
> + * Update progress tracking and send keep alive (if required).
> + */
> +static void
> +UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
> ```
> 
> Can we add atop the UpdateProgressAndKeepalive()? Currently the developers
> who
> create output plugins must call OutputPluginUpdateProgress(), but from now the
> function is not only renamed but does not have nessesary to call from plugin
> (of cource we do not restrict to call it). I think it must be clarified for 
> them.

Make sense.
Added some comments atop this function.

> ---
> ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

Removed.

> ---
> Do we have to write a document for the breakage somewhere? I think we do not
> have
> to add appendix-obsolete-* file because we did not have any links for that, 
> but
> we can add a warning in "Functions for Producing Output" subsection if needed.

Since we've moved the feature (update progress and send keepalive) from the
output plugin into the infrastructure, the output plugin is no longer
responsible for maintaining this feature anymore. Also, I think output plugin
developers only need to remove the call to the old function
OutputPluginUpdateProgress if they get compile errors related to this
modification. So, it seems to me that we don't need to add relevant
modifications in pg-doc.

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-10 Thread wangw.f...@fujitsu.com
On Mon, Mar 10, 2023 11:56 AM Amit Kapila  wrote:
> On Wed, Mar 8, 2023 at 8:24 AM wangw.f...@fujitsu.com
>  wrote:
> >
> > Attach the new patch.
> >
> 
> I think this combines multiple improvements in one patch. We can
> consider all of them together or maybe it would be better to split
> some of those. Do we think it makes sense to split some of the
> improvements? I could think of below:
> 
> 1. Remove SyncRepRequested() check from WalSndUpdateProgress().
> 2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
> any other similar place.
> 3. Change the name of ProcessPendingWrites() to WalSndSendPending().
> 4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
> 5. The remaining patch.

I think it would help to review different improvements separately, so I split
the patch as suggested.

Also addressed the comments by Kuroda-san, Osumi-san and Peter.
Attach the new patch set.

Regards,
Wang wei


v7-0001-Remove-SyncRepRequested-check-from-WalSndUpdatePr.patch
Description:  v7-0001-Remove-SyncRepRequested-check-from-WalSndUpdatePr.patch


v7-0002-Check-wal_sender_timeout-is-in-effect-before-usin.patch
Description:  v7-0002-Check-wal_sender_timeout-is-in-effect-before-usin.patch


v7-0003-Rename-the-function-ProcessPendingWrites-to-WalSn.patch
Description:  v7-0003-Rename-the-function-ProcessPendingWrites-to-WalSn.patch


v7-0004-Rename-the-function-WalSndUpdateProgress-to-WalSn.patch
Description:  v7-0004-Rename-the-function-WalSndUpdateProgress-to-WalSn.patch


v7-0005-Rework-LogicalOutputPluginWriterUpdateProgressAnd.patch
Description:  v7-0005-Rework-LogicalOutputPluginWriterUpdateProgressAnd.patch


v7-0006-Catch-the-absence-of-commit-rollback_prepared_cb_.patch
Description:  v7-0006-Catch-the-absence-of-commit-rollback_prepared_cb_.patch


Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-09 Thread Amit Kapila
On Fri, Mar 10, 2023 at 11:17 AM Peter Smith  wrote:
>
> On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila  wrote:
> >
> > On Thu, Mar 9, 2023 at 10:56 AM Peter Smith  wrote:
> > >
> > > 2. rollback_prepared_cb_wrapper
> > >
> > >   /*
> > >   * If the plugin support two-phase commits then rollback prepared 
> > > callback
> > >   * is mandatory
> > > + *
> > > + * FIXME: This should have been caught much earlier.
> > >   */
> > >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> > >   ereport(ERROR,
> > >
> > > ~
> > >
> > > Why is this seemingly unrelated FIXME still in the patch?
> > >
> >
> > After reading this Fixme comment and the error message ("logical
> > replication at prepare time requires a %s callback
> > rollback_prepared_cb"), I think we can move this and a similar check
> > in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> > function. This is because there is no use of letting prepare pass when
> > we can't do a rollback or commit prepared. What do you think?
> >
>
> My first impression was it sounds like a good idea to catch the
> missing callbacks early as you said.
>
> But if you decide to check for missing commit/rollback callbacks early
> in prepare_cb_wrapper(), then won't you also want to have equivalent
> checking done earlier for stream_prepare_cb_wrapper()?
>

Yeah, probably or we can leave the lazy checking as it is. In the
ideal case, we could check for the presence of all the callbacks in
StartupDecodingContext() but we delay it to find the missing methods
later. One possibility is that we check for any missing method in
StartupDecodingContext() if any one of prepare/streaming calls are
present but not sure if that is any better than the current
arrangement.

> And then it quickly becomes a slippery slope to question many other things:
> - Why allow startup_cb if shutdown_cb is missing?
>

I am not sure if there is a hard dependency between these two but
their callers do check for Null before invoking those.

> - Why allow change_cb if commit_cb or rollback_cb is missing?

We have a check for change_cb and commit_cb in LoadOutputPlugin. Do we
have rollback_cb() defined at all?

> - Why allow filter_prepare_cb if prepare_cb is missing?
>

I am not so sure about this but If prepare gets filtered, we don't
need to invoke prepare_cb.

> - etc.
>
> ~
>
> So I am wondering if the HEAD code lazy-check of the callback only at
> the point where it is needed was actually a deliberate design choice
> just to be simpler - e.g. we don't need to be so concerned about any
> other callback dependencies.
>

Yeah, changing that probably needs some more thought. I have mentioned
one of the possibilities above.

-- 
With Regards,
Amit Kapila.




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-09 Thread Peter Smith
On Fri, Mar 10, 2023 at 3:32 PM Amit Kapila  wrote:
>
> On Thu, Mar 9, 2023 at 10:56 AM Peter Smith  wrote:
> >
> > 2. rollback_prepared_cb_wrapper
> >
> >   /*
> >   * If the plugin support two-phase commits then rollback prepared callback
> >   * is mandatory
> > + *
> > + * FIXME: This should have been caught much earlier.
> >   */
> >   if (ctx->callbacks.rollback_prepared_cb == NULL)
> >   ereport(ERROR,
> >
> > ~
> >
> > Why is this seemingly unrelated FIXME still in the patch?
> >
>
> After reading this Fixme comment and the error message ("logical
> replication at prepare time requires a %s callback
> rollback_prepared_cb"), I think we can move this and a similar check
> in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
> function. This is because there is no use of letting prepare pass when
> we can't do a rollback or commit prepared. What do you think?
>

My first impression was it sounds like a good idea to catch the
missing callbacks early as you said.

But if you decide to check for missing commit/rollback callbacks early
in prepare_cb_wrapper(), then won't you also want to have equivalent
checking done earlier for stream_prepare_cb_wrapper()?

And then it quickly becomes a slippery slope to question many other things:
- Why allow startup_cb if shutdown_cb is missing?
- Why allow change_cb if commit_cb or rollback_cb is missing?
- Why allow filter_prepare_cb if prepare_cb is missing?
- etc.

~

So I am wondering if the HEAD code lazy-check of the callback only at
the point where it is needed was actually a deliberate design choice
just to be simpler - e.g. we don't need to be so concerned about any
other callback dependencies.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-09 Thread Amit Kapila
On Thu, Mar 9, 2023 at 10:56 AM Peter Smith  wrote:
>
> 2. rollback_prepared_cb_wrapper
>
>   /*
>   * If the plugin support two-phase commits then rollback prepared callback
>   * is mandatory
> + *
> + * FIXME: This should have been caught much earlier.
>   */
>   if (ctx->callbacks.rollback_prepared_cb == NULL)
>   ereport(ERROR,
>
> ~
>
> Why is this seemingly unrelated FIXME still in the patch?
>

After reading this Fixme comment and the error message ("logical
replication at prepare time requires a %s callback
rollback_prepared_cb"), I think we can move this and a similar check
in function commit_prepared_cb_wrapper() to prepare_cb_wrapper()
function. This is because there is no use of letting prepare pass when
we can't do a rollback or commit prepared. What do you think?

>
> 4.
>
> @@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
>
>   /* Pop the error context stack */
>   error_context_stack = errcallback.previous;
> +
> + UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
>  }
>
> ~
>
> Are the double parentheses necessary?
>

Personally, I find this style easier to follow.

-- 
With Regards,
Amit Kapila.




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-09 Thread Amit Kapila
On Wed, Mar 8, 2023 at 8:24 AM wangw.f...@fujitsu.com
 wrote:
>
> Attach the new patch.
>

I think this combines multiple improvements in one patch. We can
consider all of them together or maybe it would be better to split
some of those. Do we think it makes sense to split some of the
improvements? I could think of below:

1. Remove SyncRepRequested() check from WalSndUpdateProgress().
2. Add check of wal_sender_timeout > 0 in WalSndUpdateProgress() and
any other similar place.
3. Change the name of ProcessPendingWrites() to WalSndSendPending().
4. Change WalSndUpdateProgress() to WalSndUpdateProgressAndKeepalive().
5. The remaining patch.

Now, for (1), we can consider backpatching but I am not sure if it is
worth it because in the worst case, we will miss sending a keepalive.
For (4), it is not clear to me that we have a complete agreement on
the new name. Andres, do you have an opinion on the new name used in
the patch?

If we agree that we don't need to backpatch for (1) and the new name
for (4) is reasonable then we can commit 1-4 as one patch and then
look at the remaining patch.

Thoughts?

-- 
With Regards,
Amit Kapila.




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-08 Thread Peter Smith
Here are some review comments for v6-0001

==
General.

1.
There are lots of new comments saying:
/* don't call update progress, we didn't really make any */

but is the wording "call update progress" meaningful?

Should that be written something more like:
/* No progress has been made so there is no need to call
UpdateProgressAndKeepalive. */

==

2. rollback_prepared_cb_wrapper

  /*
  * If the plugin support two-phase commits then rollback prepared callback
  * is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
  */
  if (ctx->callbacks.rollback_prepared_cb == NULL)
  ereport(ERROR,

~

Why is this seemingly unrelated FIXME still in the patch? I thought it
was posted a while ago (See [1] comment #8) that this would be
deleted.

~~~

4.

@@ -1370,6 +1377,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache,
ReorderBufferTXN *txn,

  /* Pop the error context stack */
  error_context_stack = errcallback.previous;
+
+ UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
 }

~

Are the double parentheses necessary?

~~~

5. UpdateProgressAndKeepalive

I had previously suggested (See [2] comment #3) that the code might be
simplified if the "is_keepalive_threshold_exceeded(ctx)" check was
pushed down into this function, but it seems like nobody else gave any
opinion for/against that idea yet... so the question still stands.

==
src/backend/replication/walsender.c

6. WalSndUpdateProgressAndKeepalive

Since the 'ctx' is unused here, it might be nicer to annotate that to
make it clear it is deliberate and suppress any possible warnings
about unused params.

e.g. something like:

WalSndUpdateProgressAndKeepalive(
pg_attribute_unused() LogicalDecodingContext *ctx,
XLogRecPtr lsn,
TransactionId xid,
bool did_write,
bool finished_xact)

--
[1] 
https://www.postgresql.org/message-id/OS3PR01MB6275C6CA7C0C23730A319EAD9%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[2] 
https://www.postgresql.org/message-id/CAHut%2BPt3ZEMo-KTF%3D5KJSU%2BHdWJD19GPGGCKOmBeM47484Ychw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia.




RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-08 Thread Takamichi Osumi (Fujitsu)
Hi,


On Wednesday, March 8, 2023 11:54 AM From: wangw.f...@fujitsu.com 
 wrote:
> Attach the new patch.
Thanks for sharing v6 ! Few minor comments for the same.

(1) commit message

The old function name 'is_skip_threshold_change' is referred currently. We need 
to update it to 'is_keepalive_threshold_exceeded' I think.

(2) OutputPluginPrepareWrite

@@ -662,7 +656,8 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
if (!ctx->accept_writes)
-   elog(ERROR, "writes are only accepted in commit, begin and 
change callbacks");
+   elog(ERROR, "writes are only accepted in output plugin 
callbacks, "
+"except startup, shutdown, filter_by_origin, and 
filter_prepare.");

We can remove the period at the end of error string.

(3) is_keepalive_threshold_exceeded's comments

+/*
+ * Helper function to check whether a large number of changes have been skipped
+ * continuously.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)

I suggest to update the comment slightly something like below.
From:
...whether a large number of changes have been skipped continuously
To:
...whether a large number of changes have been skipped without being sent to 
the output plugin continuously

(4) term for 'keepalive'

+/*
+ * Update progress tracking and send keep alive (if required).
+ */

The 'keep alive' might be better to be replaced with 'keepalive', which looks 
commonest in other source codes. In the current patch, there are 3 different 
ways to express it (the other one is 'keep-alive') and it would be better to 
unify the term, at least within the same patch ?


Best Regards,
Takamichi Osumi



RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-08 Thread Hayato Kuroda (Fujitsu)
Dear Wang,

Thank you for updating the patch! I have briefly tested your patch
and it worked well in following case.

* WalSndUpdateProgressAndKeepalive is called when many inserts have come
  but the publisher does not publish the insertion. PSA the script for this.
* WalSndUpdateProgressAndKeepalive is called when the commit record is not
  related with the specified database
* WalSndUpdateProgressAndKeepalive is called when many inserts for unlogged
  tables are done.

> > ---
> > 01. missing comments
> > You might miss the comment from Peter[1]. Or could you pin the related one?
> 
> Since I think the functions WalSndPrepareWrite and WalSndWriteData have
> similar
> parameters and the HEAD has no related comments, I'm not sure whether we
> should
> add them in this patch, or in a separate patch to comment atop these callback
> functions or where they are called.

Make sense, OK.

> > ---
> > 02. LogicalDecodingProcessRecord()
> >
> > Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
> > decoding function? Assuming that the timeout parameter does not have enough
> > time
> > period and there are so many sequential operations in the transaction. At 
> > that
> > time
> > there may be a possibility that timeout is occurred while calling
> > ReorderBufferProcessXid()
> > several times.  It may be a bad example, but I meant to say that we may 
> > have to
> > consider the case that decoding function has not implemented yet.
> 
> I think it's ok in this function. If the decoding function has not been
> implemented for a record, I think we quickly return to the loop in the 
> function
> WalSndLoop, where it will try to send the keepalive message.

I confirmed that and yes, we will go back to WalSndLoop().

> BTW, in the previous discussion [1], we decided to ignore some paths, because
> the gain from modifying them may not be so great.

I missed the discussion, thanks. Based on that codes seems right.

Followings are my comments.

---
```
+/*
+ * Update progress tracking and send keep alive (if required).
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
```

Can we add atop the UpdateProgressAndKeepalive()? Currently the developers who
create output plugins must call OutputPluginUpdateProgress(), but from now the
function is not only renamed but does not have nessesary to call from plugin
(of cource we do not restrict to call it). I think it must be clarified for 
them.

---
ReorderBufferUpdateProgressTxnCB must be removed from typedefs.list.

---
Do we have to write a document for the breakage somewhere? I think we do not 
have
to add appendix-obsolete-* file because we did not have any links for that, but
we can add a warning in "Functions for Producing Output" subsection if needed.  

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



test.sh
Description: test.sh


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-07 Thread wangw.f...@fujitsu.com
On Tue, Mar 7, 2023 15:55 PM Kuroda, Hayato/黒田 隼人  
wrote:
> Dear Wang,
> 
> Thank you for updating the patch! Followings are my comments.

Thanks for your comments.

> ---
> 01. missing comments
> You might miss the comment from Peter[1]. Or could you pin the related one?

Since I think the functions WalSndPrepareWrite and WalSndWriteData have similar
parameters and the HEAD has no related comments, I'm not sure whether we should
add them in this patch, or in a separate patch to comment atop these callback
functions or where they are called.

> ---
> 02. LogicalDecodingProcessRecord()
> 
> Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
> decoding function? Assuming that the timeout parameter does not have enough
> time
> period and there are so many sequential operations in the transaction. At that
> time
> there may be a possibility that timeout is occurred while calling
> ReorderBufferProcessXid()
> several times.  It may be a bad example, but I meant to say that we may have 
> to
> consider the case that decoding function has not implemented yet.

I think it's ok in this function. If the decoding function has not been
implemented for a record, I think we quickly return to the loop in the function
WalSndLoop, where it will try to send the keepalive message.

BTW, in the previous discussion [1], we decided to ignore some paths, because
the gain from modifying them may not be so great.

> ---
> 03. stream_*_cb_wrapper
> 
> Only stream_*_cb_wrapper have comments "don't call update progress, we
> didn't really make any", but
> there are more functions that does not send updates. Do you have any reasons
> why only they have?

Added this comment to more functions.
I think the following six functions don't call the function
UpdateProgressAndKeepalive in v5 patch:
- begin_cb_wrapper
- begin_prepare_cb_wrapper
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

I think the comment you mentioned means that no new progress needs to be updated
in this *_cb_wrapper. Also, I think we don't need to update the progress at the
beginning of a transaction, just like in HEAD. So, I added the same comment only
in the 4 functions below:
- startup_cb_wrapper
- shutdown_cb_wrapper
- filter_prepare_cb_wrapper
- filter_by_origin_cb_wrapper

Attach the new patch.

[1] - 
https://www.postgresql.org/message-id/20230213180302.u5sqosteflr3zkiz%40awork3.anarazel.de

Regards,
Wang wei


v6-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
Description:  v6-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-06 Thread Hayato Kuroda (Fujitsu)
Dear Wang,

Thank you for updating the patch! Followings are my comments.

---
01. missing comments
You might miss the comment from Peter[1]. Or could you pin the related one?

---
02. LogicalDecodingProcessRecord()

Don't we have to call UpdateDecodingProgressAndKeepalive() when there is no
decoding function? Assuming that the timeout parameter does not have enough time
period and there are so many sequential operations in the transaction. At that 
time
there may be a possibility that timeout is occurred while calling 
ReorderBufferProcessXid()
several times.  It may be a bad example, but I meant to say that we may have to
consider the case that decoding function has not implemented yet.

---
03. stream_*_cb_wrapper

Only stream_*_cb_wrapper have comments "don't call update progress, we didn't 
really make any", but
there are more functions that does not send updates. Do you have any reasons 
why only they have?

[1]: 
https://www.postgresql.org/message-id/CAHut%2BPsksiQHuv4A54R4w79TAvCu__PcuffKYY0V96e2z_sEvA%40mail.gmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-06 Thread wangw.f...@fujitsu.com
On Fri, Mar 3, 2023 8:18 AM Peter Smith  wrote:
> 

Thanks for your comments.

> 1.
> +
> +static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
> +bool finished_xact);
> +
> +static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);
> 
> 1a.
> There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

Removed.

> ~
> 
> 1b.
> I did not recognize a reason for the different naming conventions.
> Here are two new functions but one is CamelCase and one is snake_case.
> What are the rules to decide the naming?

I used the snake_case style for the function UpdateProgressAndKeepalive in the
previous version, but it was confusing because it shared the same parameter name
with the functions StartupDecodingContext, CreateInitDecodingContext and
CreateDecodingContext. To avoid this confusion, and since both naming styles
exist in this file, I changed it to CamelCase style.

Attach the new patch.

Regards,
Wang wei


v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
Description:  v5-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch


Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-02 Thread Peter Smith
On Fri, Mar 3, 2023 at 1:27 PM houzj.f...@fujitsu.com
 wrote:
>
> On Friday, March 3, 2023 8:18 AM Peter Smith  wrote:
...
> > Anyway, I think this exposes another problem. If you still want the patch 
> > to pass
> > the 'finshed_xact' parameter separately then AFAICT the first parameter 
> > (ctx)
> > now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
> > function, so it ought to be removed.
> >
>
> I am not sure about this. The first parameter (ctx) has been introduced since
> the Lag tracking feature. I think this is to make it consistent with other
> LogicalOutputPluginWriter callbacks. In addition, this is a public callback
> function and user can implement their own logic in this callbacks based on
> interface, removing this existing parameter doesn't look great to me. Although
> this patch also removes the existing skipped_xact, but it's because we decide
> to use another parameter did_write which can play a similar role.
>

Oh right, that makes sense. Thanks.

Perhaps it just wants some comment to mention that although the
built-in implementation does not use the 'ctx' users might implement
their own logic which does use it.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-02 Thread houzj.f...@fujitsu.com
On Friday, March 3, 2023 8:18 AM Peter Smith  wrote:
> On Wed, Mar 1, 2023 at 9:16 PM wangw.f...@fujitsu.com
>  wrote:
> >
> > On Tues, Feb 28, 2023 at 9:12 AM Peter Smith 
> wrote:
> > > Here are some comments for the v2-0001 patch.
> > >
> > > (I haven't looked at the v3 that was posted overnight; maybe some of
> > > my comments have already been addressed.)
> >
> > Thanks for your comments.
> >
> > > ==
> > > General
> > >
> > > 1. (Info from the commit message)
> > > Since we can know whether the change is an end of transaction change
> > > in the common code, we removed the
> LogicalDecodingContext->end_xact
> > > introduced in commit f95d53e.
> > >
> > > ~
> > >
> > > TBH, it was not clear to me that this change was an improvement.
> > > IIUC, it removes the "unnecessary" member, but only does that by
> > > replacing it everywhere with a boolean parameter passed to
> > > update_progress_and_keepalive(). So the end result seems no less
> > > code, but it is less readable code now because you need to know what
> > > the true/false parameter means. I wonder if it would have been
> > > better just to leave this how it was.
> >
> > Since I think we can know the meaning of the input based on the
> > parameter name of the function, I think both approaches are fine. But
> > the approach in the current patch can reduce a member of the
> > structure, so I think this modification looks good to me.
> >
> 
...
> 
> Anyway, I think this exposes another problem. If you still want the patch to 
> pass
> the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
> now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
> function, so it ought to be removed.
> 

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Best Regards,
Hou zj


Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-02 Thread Andres Freund
Hi,

On 2023-03-03 11:18:04 +1100, Peter Smith wrote:
> - Why is reducing members of LogicalDecodingContext even a goal? I
> thought the LogicalDecodingContext is intended to be the one-stop
> place to hold *all* things related to the "Context" (including that
> member that was deleted).

There's not really a reason to keep it in LogicalDecodingContext after
this change. It was only needed there because of the broken
architectural model of calling UpdateProgress from within output
plugins. Why set a field in each wrapper that we don't need?

> - How is reducing one member better than introducing one new parameter
> in multiple calls?

Reducing the member isn't important, needing to set it before each
callback however makes sense.

Greetings,

Andres Freund




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-02 Thread Peter Smith
On Wed, Mar 1, 2023 at 9:16 PM wangw.f...@fujitsu.com
 wrote:
>
> On Tues, Feb 28, 2023 at 9:12 AM Peter Smith  wrote:
> > Here are some comments for the v2-0001 patch.
> >
> > (I haven't looked at the v3 that was posted overnight; maybe some of
> > my comments have already been addressed.)
>
> Thanks for your comments.
>
> > ==
> > General
> >
> > 1. (Info from the commit message)
> > Since we can know whether the change is an end of transaction change in the
> > common code, we removed the LogicalDecodingContext->end_xact introduced
> > in
> > commit f95d53e.
> >
> > ~
> >
> > TBH, it was not clear to me that this change was an improvement. IIUC,
> > it removes the "unnecessary" member, but only does that by replacing
> > it everywhere with a boolean parameter passed to
> > update_progress_and_keepalive(). So the end result seems no less code,
> > but it is less readable code now because you need to know what the
> > true/false parameter means. I wonder if it would have been better just
> > to leave this how it was.
>
> Since I think we can know the meaning of the input based on the parameter name
> of the function, I think both approaches are fine. But the approach in the
> current patch can reduce a member of the structure, so I think this 
> modification
> looks good to me.
>

Hmm, I am not so sure:

- Why is reducing members of LogicalDecodingContext even a goal? I
thought the LogicalDecodingContext is intended to be the one-stop
place to hold *all* things related to the "Context" (including that
member that was deleted).

- How is reducing one member better than introducing one new parameter
in multiple calls?

Anyway, I think this exposes another problem. If you still want the
patch to pass the 'finshed_xact' parameter separately then AFAICT the
first parameter (ctx) now becomes unused/redundant in the
WalSndUpdateProgressAndKeepalive function, so it ought to be removed.

> > ==
> > src/backend/replication/logical/logical.c
> >
> > 3. General - calls to is_skip_threshold_change()
> >
> > + if (is_skip_threshold_change(ctx))
> > + update_progress_and_keepalive(ctx, false);
> >
> > There are multiple calls like this, which are guarding the
> > update_progress_and_keepalive() with the is_skip_threshold_change()
> > - See truncate_cb_wrapper
> > - See message_cb_wrapper
> > - See stream_change_cb_wrapper
> > - See stream_message_cb_wrapper
> > - See stream_truncate_cb_wrapper
> > - See UpdateDecodingProgressAndKeepalive
> >
> > IIUC, then I was thinking all those conditions maybe can be pushed
> > down *into* the wrapper, thereby making every calling code simpler.
> >
> > e.g. make the wrapper function code look similar to the current
> > UpdateDecodingProgressAndKeepalive:
> >
> > BEFORE (update_progress_and_keepalive)
> > {
> > if (!ctx->update_progress_and_keepalive)
> > return;
> >
> > ctx->update_progress_and_keepalive(ctx, ctx->write_location,
> >ctx->write_xid, ctx->did_write,
> >finished_xact);
> > }
> > AFTER
> > {
> > if (!ctx->update_progress_and_keepalive)
> > return;
> >
> > if (finished_xact || is_skip_threshold_change(ctx))
> > {
> > ctx->update_progress_and_keepalive(ctx, ctx->write_location,
> >ctx->write_xid, ctx->did_write,
> >finished_xact);
> > }
> > }
>
> Since I want to keep the function update_progress_and_keepalive simple, I 
> didn't
> change it.

Hmm, the reason given seems like a false economy to me. You are able
to keep this 1 function simpler only by adding more complexity to the
calls in 6 other places. Let's see if other people have opinions about
this.

~~~

1.
+
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);

1a.
There is an unnecessary extra blank line above the UpdateProgressAndKeepalive.

~

1b.
I did not recognize a reason for the different naming conventions.
Here are two new functions but one is CamelCase and one is snake_case.
What are the rules to decide the naming?

--
Kind Regards,
Peter Smith.
Fujitsu Australia




RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-01 Thread wangw.f...@fujitsu.com
On Tues, Feb 28, 2023 at 11:31 AM Osumi, Takamichi/大墨 昂道 
 wrote:
> Hi,
> 
> 
> On Monday, February 27, 2023 6:30 PM wangw.f...@fujitsu.com
>  wrote:
> > Attach the new patch.
> Thanks for sharing v3. Minor review comments and question.

Thanks for your comments.

> (1) UpdateDecodingProgressAndKeepalive header comment
> 
> The comment should be updated to explain maybe why we reset some other
> flags as discussed in [1] and the functionality to update and keepalive of the
> function simply.

Added the comments atop the function UpdateDecodingProgressAndKeepalive about
when to call this function.

> (2) OutputPluginPrepareWrite
> 
> Probably the changed error string is too wide.
> 
> @@ -662,7 +657,7 @@ void
>  OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
>  {
> if (!ctx->accept_writes)
> -   elog(ERROR, "writes are only accepted in commit, begin and 
> change
> callbacks");
> +   elog(ERROR, "writes are only accepted in callbacks in the
> OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin 
> and
> filter_prepare callbacks)");
> 
> I thought you can break the error message into two string lines. Or, you can
> rephrase it to different expression.

I tried to improve this message and broke it into two lines in the new patch.

> (3) Minor question
> 
> The patch introduced the goto statements into the cb_wrapper functions. Is the
> purpose to call the update_progress_and_keepalive after pop the error stack,
> even if the corresponding callback is missing ? I thought we can just have 
> "else"
> clause for the check of the existence of callback, but did you choose the 
> current
> goto style for readability ?

I think both styles look fine to me.
I haven't modified this for this version. I'll reconsider if anyone else has
similar thoughts later.

> (4) Name of is_skip_threshold_change
> 
> I also feel the name of is_skip_threshold_change can be changed to
> "exceeded_keepalive_threshold" or something. Other candidates are proposed
> by Peter-san in [2].

Renamed this function to is_keepalive_threshold_exceeded.

Please see the new patch in [1].

[1] - 
https://www.postgresql.org/message-id/OS3PR01MB6275C6CA7C0C23730A319EAD9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-01 Thread wangw.f...@fujitsu.com
On Tues, Feb 28, 2023 at 9:12 AM Peter Smith  wrote:
> Here are some comments for the v2-0001 patch.
> 
> (I haven't looked at the v3 that was posted overnight; maybe some of
> my comments have already been addressed.)

Thanks for your comments.

> ==
> General
> 
> 1. (Info from the commit message)
> Since we can know whether the change is an end of transaction change in the
> common code, we removed the LogicalDecodingContext->end_xact introduced
> in
> commit f95d53e.
> 
> ~
> 
> TBH, it was not clear to me that this change was an improvement. IIUC,
> it removes the "unnecessary" member, but only does that by replacing
> it everywhere with a boolean parameter passed to
> update_progress_and_keepalive(). So the end result seems no less code,
> but it is less readable code now because you need to know what the
> true/false parameter means. I wonder if it would have been better just
> to leave this how it was.

Since I think we can know the meaning of the input based on the parameter name
of the function, I think both approaches are fine. But the approach in the
current patch can reduce a member of the structure, so I think this modification
looks good to me.

> ==
> src/backend/replication/logical/logical.c
> 
> 2. General - blank lines
> 
> There are multiple places in this file where the patch removed some
> statements but left blank lines. The result is 2 blank lines remaining
> instead of one.
> 
> see change_cb_wrapper.
> see truncate_cb_wrapper.
> see stream_start_cb_wrapper.
> see stream_stop_cb_wrapper.
> see stream_change_cb_wrapper.
> 
> e.g.
> 
> BEFORE
> ctx->write_location = last_lsn;
> 
> ctx->end_xact = false;
> 
> /* in streaming mode, stream_stop_cb is required */
> 
> AFTER (now there are 2 blank lines)
> ctx->write_location = last_lsn;
> 
> 
> /* in streaming mode, stream_stop_cb is required */

Removed.

> ~~~
> 
> 3. General - calls to is_skip_threshold_change()
> 
> + if (is_skip_threshold_change(ctx))
> + update_progress_and_keepalive(ctx, false);
> 
> There are multiple calls like this, which are guarding the
> update_progress_and_keepalive() with the is_skip_threshold_change()
> - See truncate_cb_wrapper
> - See message_cb_wrapper
> - See stream_change_cb_wrapper
> - See stream_message_cb_wrapper
> - See stream_truncate_cb_wrapper
> - See UpdateDecodingProgressAndKeepalive
> 
> IIUC, then I was thinking all those conditions maybe can be pushed
> down *into* the wrapper, thereby making every calling code simpler.
> 
> e.g. make the wrapper function code look similar to the current
> UpdateDecodingProgressAndKeepalive:
> 
> BEFORE (update_progress_and_keepalive)
> {
> if (!ctx->update_progress_and_keepalive)
> return;
> 
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>ctx->write_xid, ctx->did_write,
>finished_xact);
> }
> AFTER
> {
> if (!ctx->update_progress_and_keepalive)
> return;
> 
> if (finished_xact || is_skip_threshold_change(ctx))
> {
> ctx->update_progress_and_keepalive(ctx, ctx->write_location,
>ctx->write_xid, ctx->did_write,
>finished_xact);
> }
> }

Since I want to keep the function update_progress_and_keepalive simple, I didn't
change it.

> ~~~
> 
> 4. StartupDecodingContext
> 
> @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
> XLogReaderRoutine *xl_routine,
> LogicalOutputPluginWriterPrepareWrite prepare_write,
> LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPluginWriterUpdateProgress update_progress)
> +   LogicalOutputPluginWriterUpdateProgressAndKeepalive
> update_progress_and_keepalive)
> 
> TBH, I find it confusing that the new parameter name
> ('update_progress_and_keepalive') is identical to the static function
> name in the same C source file. It introduces a kind of unnecessary
> shadowing and makes it harder to search/read the code.
> 
> I suggest just calling this param something unique and local to the
> function like 'do_update_keepalive'.
> 
> ~~~
> 5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
> XLogReaderRoutine *xl_routine,
> LogicalOutputPluginWriterPrepareWrite prepare_write,
> LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPluginWriterUpdateProgress update_progress)
> +   LogicalOutputPluginWriterUpdateProgressAndKeepalive
> update_progress_and_keepalive)
> 
> (Ditto previous comment #4)
> 
> TBH, I find it confusing that the new parameter name
> ('update_progress_and_keepalive') is identical to the static function
> name in the same C source file. It introduces a kind of unnecessary
> shadowing and makes it harder to search/read the code.
> 
> I suggest just calling this param something unique and local to the
> function like 'do_update_keepalive'.
> ~~~
> 
> 6. CreateDecodingContext
> 
> @@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
> XLogReaderRoutine *xl_routine,
> LogicalOutputPluginWriterPrepareWrite prepare_write,
> LogicalOutputPluginWriterWrite do_write,
> -   LogicalOutputPlug

RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-27 Thread Takamichi Osumi (Fujitsu)
Hi,


On Monday, February 27, 2023 6:30 PM wangw.f...@fujitsu.com 
 wrote:
> Attach the new patch.
Thanks for sharing v3. Minor review comments and question.


(1) UpdateDecodingProgressAndKeepalive header comment

The comment should be updated to explain maybe why we reset some other flags as 
discussed in [1] and the functionality to update and keepalive of the function 
simply.

(2) OutputPluginPrepareWrite

Probably the changed error string is too wide.

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
if (!ctx->accept_writes)
-   elog(ERROR, "writes are only accepted in commit, begin and 
change callbacks");
+   elog(ERROR, "writes are only accepted in callbacks in the 
OutputPluginCallbacks structure (except startup, shutdown, filter_by_origin and 
filter_prepare callbacks)");

I thought you can break the error message into two string lines. Or, you can 
rephrase it to different expression.

(3) Minor question

The patch introduced the goto statements into the cb_wrapper functions. Is the 
purpose to call the update_progress_and_keepalive after pop the error stack, 
even if the corresponding callback is missing ? I thought we can just have 
"else" clause for the check of the existence of callback, but did you choose 
the current goto style for readability ?

(4) Name of is_skip_threshold_change

I also feel the name of is_skip_threshold_change can be changed to 
"exceeded_keepalive_threshold" or something. Other candidates are proposed by 
Peter-san in [2].



[1] - 
https://www.postgresql.org/message-id/OS3PR01MB6275374EBE7C8CABBE6730099EAF9%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[2] - 
https://www.postgresql.org/message-id/CAHut%2BPt3ZEMo-KTF%3D5KJSU%2BHdWJD19GPGGCKOmBeM47484Ychw%40mail.gmail.com


Best Regards,
Takamichi Osumi





Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-27 Thread Peter Smith
Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

==
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

==
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;


/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
   ctx->write_xid, ctx->did_write,
   finished_xact);
}
}


~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
-   LogicalOutputPluginWriterUpdateProgress update_progress)
+   LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
  if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepte

RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-27 Thread wangw.f...@fujitsu.com
On Thur, Feb 23, 2023 at 18:41 PM Kuroda, Hayato/�\田 隼人 
 wrote:
> Dear Wang,
> 
> Thank you for making the patch. IIUC your patch basically can achieve that
> output plugin
> does not have to call UpdateProgress.

Thanks for your review and comments.

> I think the basic approach is as follows, is it right?
> 
> 1. In *_cb_wrapper, set ctx->did_write to false
> 2. In OutputPluginWrite() set ctx->did_write to true.
>This means that changes are really written, not skipped.
> 3. At the end of the transaction, call update_progress_and_keepalive().
>Even if we are not at the end, check skipped count and call the function if
> needed.
>The counter will be reset if ctx->did_write is true or we exceed the 
> threshold.

Yes, you are right.
For the reset of the counter, please also refer to the reply to #05.

> Followings are my comments. I apologize if I missed some previous discussions.
> 
> 01. logical.c
> 
> ```
> +static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
> + 
> bool finished_xact);
> +
> +static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
> ```
> 
> "struct" may be not needed.

Removed.

> 02. UpdateDecodingProgressAndKeepalive
> 
> I think the name should be UpdateDecodingProgressAndSendKeepalive(),
> keepalive is not verb.
> (But it's ok to ignore if you prefer the shorter name)
> Same thing can be said for the name of datatype and callback.

Yes, I prefer the shorter one. Otherwise, I think some names would be longer.

> 03. UpdateDecodingProgressAndKeepalive
> 
> ```
> +   /* set output state */
> +   ctx->accept_writes = false;
> +   ctx->write_xid = xid;
> +   ctx->write_location = lsn;
> +   ctx->did_write = false;
> ```
> 
> Do we have to modify accept_writes, write_xid, and write_location here?
> These value is not used in WalSndUpdateProgressAndKeepalive().

I think it might be better to set these three flags.
Since LogicalOutputPluginWriterUpdateProgressAndKeepalive is an open callback, I
think setting write_xid and write_location is not just for the function
WalSndUpdateProgressAndKeepalive. And I think setting accept_writes could
prevent some wrong usage.

> 04. stream_abort_cb_wrapper
> 
> ```
> +   update_progress_and_keepalive(ctx, true)
> ```
> 
> I'm not sure, but is it correct that call update_progress_and_keepalive() with
> finished_xact = true? Isn't there a possibility that streamed sub-transaciton 
> is
> aborted?

Fixed.

> 05. is_skip_threshold_change
> 
> At the end of the transaction, update_progress_and_keepalive() is called 
> directly.
> Don't we have to reset change_count here?

I think this might complicate the function is_skip_threshold_change, so I didn't
reset the counter in this case.
I think the worst case of not resetting the counter is to delay sending the
keepalive message for the next transaction. But since the threshold we're using
is safe enough, it seems fine to me not to reset the counter in this case.
Added these related comments in the function is_skip_threshold_change.

> 06. ReorderBufferAbort
> 
> Assuming that the top transaction is aborted. At that time
> update_progress_and_keepalive()
> is called in stream_abort_cb_wrapper(), an then
> WalSndUpdateProgressAndKeepalive()
> is called at the end of ReorderBufferAbort(). Do we have to do in both?
> I think stream_abort_cb_wrapper() may be not needed.

Yes, I think we only need one call for this case.
To make the behavior in *_cb_wrapper look consistent, I avoided the second call
for this case in the function ReorderBufferAbort.

> 07. WalSndUpdateProgress
> 
> You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be
> still strange
> because it will be called even if there are no pending writes.
> 
> Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
> (at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be
> needed.

I think after sending the keepalive message (in WalSndKeepaliveIfNecessary), we
need to make sure the pending data is flushed through the loop.

Attach the new patch.

Regards,
Wang wei


v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
Description:  v3-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-23 Thread Hayato Kuroda (Fujitsu)
Dear Wang,

Thank you for making the patch. IIUC your patch basically can achieve that 
output plugin
does not have to call UpdateProgress.

I think the basic approach is as follows, is it right?

1. In *_cb_wrapper, set ctx->did_write to false
2. In OutputPluginWrite() set ctx->did_write to true.
   This means that changes are really written, not skipped.
3. At the end of the transaction, call update_progress_and_keepalive().
   Even if we are not at the end, check skipped count and call the function if 
needed.
   The counter will be reset if ctx->did_write is true or we exceed the 
threshold.

Followings are my comments. I apologize if I missed some previous discussions.

01. logical.c

```
+static void update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
+   
  bool finished_xact);
+
+static bool is_skip_threshold_change(struct LogicalDecodingContext *ctx);
```

"struct" may be not needed.

02. UpdateDecodingProgressAndKeepalive

I think the name should be UpdateDecodingProgressAndSendKeepalive(), keepalive 
is not verb.
(But it's ok to ignore if you prefer the shorter name)
Same thing can be said for the name of datatype and callback.

03. UpdateDecodingProgressAndKeepalive

```
+   /* set output state */
+   ctx->accept_writes = false;
+   ctx->write_xid = xid;
+   ctx->write_location = lsn;
+   ctx->did_write = false;
```

Do we have to modify accept_writes, write_xid, and write_location here?
These value is not used in WalSndUpdateProgressAndKeepalive().

04. stream_abort_cb_wrapper

```
+   update_progress_and_keepalive(ctx, true)
```

I'm not sure, but is it correct that call update_progress_and_keepalive() with
finished_xact = true? Isn't there a possibility that streamed sub-transaciton 
is aborted?


05. is_skip_threshold_change

At the end of the transaction, update_progress_and_keepalive() is called 
directly.
Don't we have to reset change_count here?

06. ReorderBufferAbort

Assuming that the top transaction is aborted. At that time 
update_progress_and_keepalive()
is called in stream_abort_cb_wrapper(), an then 
WalSndUpdateProgressAndKeepalive()
is called at the end of ReorderBufferAbort(). Do we have to do in both?
I think stream_abort_cb_wrapper() may be not needed.

07. WalSndUpdateProgress

You renamed ProcessPendingWrites() to WalSndSendPending(), but it may be still 
strange
because it will be called even if there are no pending writes.

Isn't it sufficient to call ProcessRepliesIfAny(), WalSndCheckTimeOut() and
(at least) WalSndKeepaliveIfNecessary()in the case? Or better name may be 
needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-22 Thread wangw.f...@fujitsu.com
On Sun, Feb 19, 2023 at 21:06 PM Wang, Wei/王 威  wrote:
> On Thur, Feb 14, 2023 at 2:03 AM Andres Freund  wrote:
> > On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > > > The patch calls update_progress in change_cb_wrapper and other
> > > > > wrappers which will miss the case of DDLs that generates a lot of data
> > > > > that is not processed by the plugin. I think for that we either need
> > > > > to call update_progress from reorderbuffer.c similar to what the patch
> > > > > has removed or we need some other way to address it. Do you have any
> > > > > better idea?
> > > >
> > > > I don't mind calling something like update_progress() in the specific 
> > > > cases
> > > > that's needed, but I think those are just the
> > > >   if (!RelationIsLogicallyLogged(relation))
> > > >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> > > >
> > > > To me it makes a lot more sense to call update_progress() for those, 
> > > > rather
> > > > than generally.
> > > >
> > >
> > > Won't it be better to call it wherever we don't invoke any wrapper
> > > function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> > > changes, etc.? I was thinking that wherever we don't call the wrapper
> > > function which means we don't have a chance to invoke
> > > update_progress(), the timeout can happen if there are a lot of such
> > > messages.
> >
> > ISTM that the likelihood of causing harm due to increased overhead is higher
> > than the gain.
> 
> I would like to do something for this thread. So, I am planning to update the
> patch as per discussion in the email chain unless someone is already working 
> on
> it.

Thanks to Andres and Amit for the discussion.

Based on the discussion and Andres' WIP(in [1]), I made the following
modifications:
1. Some function renaming stuffs.
2. Added the threshold-related logic in the function
update_progress_and_keepalive.
3. Added the timeout-related processing of temporary data and
unlogged/foreign/system tables in the function ReorderBufferProcessTXN.
4. Improved error messages in the function OutputPluginPrepareWrite.
5. Invoked function update_progress_and_keepalive to fix sync-related problems
caused by filters such as origin in functions DecodeCommit(), DecodePrepare()
and ReorderBufferAbort();
6. Removed the invocation of function update_progress_and_keepalive in the
function begin_prepare_cb_wrapper().
7. Invoked the function update_progress_and_keepalive() in the function
stream_truncate_cb_wrapper(), just like we do in the function
truncate_cb_wrapper().
8. Removed the check of SyncRepRequested() in the syncrep logic in the function
WalSndUpdateProgressAndKeepAlive();
9. Added the check for wal_sender_timeout before using it in functions
WalSndUpdateProgressAndKeepAlive() and WalSndWriteData();

Attach the new patch.

[1] - 
https://www.postgresql.org/message-id/20230208200235.esfoggsmuvf4pugt%40awork3.anarazel.de

Regards,
Wang wei


v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch
Description:  v2-0001-Rework-LogicalOutputPluginWriterUpdateProgress.patch


RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-19 Thread wangw.f...@fujitsu.com
On Thur, Feb 14, 2023 at 2:03 AM Andres Freund  wrote:
> On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > > The patch calls update_progress in change_cb_wrapper and other
> > > > wrappers which will miss the case of DDLs that generates a lot of data
> > > > that is not processed by the plugin. I think for that we either need
> > > > to call update_progress from reorderbuffer.c similar to what the patch
> > > > has removed or we need some other way to address it. Do you have any
> > > > better idea?
> > >
> > > I don't mind calling something like update_progress() in the specific 
> > > cases
> > > that's needed, but I think those are just the
> > >   if (!RelationIsLogicallyLogged(relation))
> > >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> > >
> > > To me it makes a lot more sense to call update_progress() for those, 
> > > rather
> > > than generally.
> > >
> >
> > Won't it be better to call it wherever we don't invoke any wrapper
> > function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> > changes, etc.? I was thinking that wherever we don't call the wrapper
> > function which means we don't have a chance to invoke
> > update_progress(), the timeout can happen if there are a lot of such
> > messages.
> 
> ISTM that the likelihood of causing harm due to increased overhead is higher
> than the gain.

I would like to do something for this thread. So, I am planning to update the
patch as per discussion in the email chain unless someone is already working on
it.

Regards,
Wang wei


Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-13 Thread Andres Freund
Hi,

On 2023-02-13 14:06:57 +0530, Amit Kapila wrote:
> > > The patch calls update_progress in change_cb_wrapper and other
> > > wrappers which will miss the case of DDLs that generates a lot of data
> > > that is not processed by the plugin. I think for that we either need
> > > to call update_progress from reorderbuffer.c similar to what the patch
> > > has removed or we need some other way to address it. Do you have any
> > > better idea?
> >
> > I don't mind calling something like update_progress() in the specific cases
> > that's needed, but I think those are just the
> >   if (!RelationIsLogicallyLogged(relation))
> >   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
> >
> > To me it makes a lot more sense to call update_progress() for those, rather
> > than generally.
> >
> 
> Won't it be better to call it wherever we don't invoke any wrapper
> function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
> changes, etc.? I was thinking that wherever we don't call the wrapper
> function which means we don't have a chance to invoke
> update_progress(), the timeout can happen if there are a lot of such
> messages.

ISTM that the likelihood of causing harm due to increased overhead is higher
than the gain.

Greetings,

Andres Freund




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-13 Thread Andres Freund
Hi,

On 2023-02-13 08:22:34 +0530, Amit Kapila wrote:
> On Sat, Feb 11, 2023 at 3:04 AM Andres Freund  wrote:
> >
> > > One difference I see with the patch is that I think we will end up
> > > sending keepalive for empty prepared transactions even though we don't
> > > skip sending begin/prepare messages for those.
> >
> > With the proposed approach we reliably know whether a callback wrote
> > something, so we can tune the behaviour here fairly easily.
> >
> 
> I would like to clarify a few things about the proposed approach. In
> commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
> ctx->did_write = false;, then call the commit/prepare callback (which
> will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
> update_progress() which will make decisions based on ctx->did_write
> flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
> should know that the transaction has performed some writes before that
> call which is currently working because pgoutput is tracking the same
> via sent_begin_txn.

I don't really see these as being related. What pgoutput does internally to
optimize for some usecases shouldn't matter to the larger infrastructure.


> Is the intention here that we still track whether BEGIN () has been sent via
> pgoutput?

Yes. If somebody later wants to propose tracking this alongside a txn and
passing that to the output plugin callbacks, we can do that. But that's
independent of fixing the broken architecture of the progress infrastructure.

Greetings,

Andres Freund




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-13 Thread Amit Kapila
On Sat, Feb 11, 2023 at 2:34 AM Andres Freund  wrote:
>
> On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> > On Thu, Feb 9, 2023 at 1:33 AM Andres Freund  wrote:
> > >
> > > Hacking on a rough prototype how I think this should rather look, I had a 
> > > few
> > > questions / remarks:
> > >
> > > - We probably need to call UpdateProgress from a bunch of places in 
> > > decode.c
> > >   as well? Indicating that we're lagging by a lot, just because all
> > >   transactions were in another database seems decidedly suboptimal.
> > >
> >
> > We can do that but I think in all those cases we will reach quickly
> > enough back to walsender logic (WalSndLoop - that will send keepalive
> > if required) that we don't need to worry. After processing each
> > record, the logic will return back to the main loop that will send
> > keepalive if required.
>
> For keepalive processing yes, for syncrep and accurate lag tracking, I don't
> think that suffices?  We could do that in WalSndLoop() instead I guess, but
> we'd have more information about when that's useful in decode.c.
>

Yeah, I think one possibility to address that is to call
update_progress() in DecodeCommit() and friends when we need to skip
the xact. We decide that in DecodeTXNNeedSkip. In the checks in that
function, I am not sure whether we need to call it for the case where
we skip the xact because we decide that it was previously decoded.

>
> > The patch calls update_progress in change_cb_wrapper and other
> > wrappers which will miss the case of DDLs that generates a lot of data
> > that is not processed by the plugin. I think for that we either need
> > to call update_progress from reorderbuffer.c similar to what the patch
> > has removed or we need some other way to address it. Do you have any
> > better idea?
>
> I don't mind calling something like update_progress() in the specific cases
> that's needed, but I think those are just the
>   if (!RelationIsLogicallyLogged(relation))
>   if (relation->rd_rel->relrewrite && !rb->output_rewrites))
>
> To me it makes a lot more sense to call update_progress() for those, rather
> than generally.
>

Won't it be better to call it wherever we don't invoke any wrapper
function like for cases REORDER_BUFFER_CHANGE_INVALIDATION, sequence
changes, etc.? I was thinking that wherever we don't call the wrapper
function which means we don't have a chance to invoke
update_progress(), the timeout can happen if there are a lot of such
messages.

>
> I think, independent of the update_progress calls, it'd be worth investing a
> bit of time into optimizing those cases, so that we don't put the changes into
> the reorderbuffer in the first place.  I think we find space for two flag bits
> to identify the cases in the WAL, rather than needing to access the catalog to
> figure it out.  If we don't find space, we could add an annotation the WAL
> record (making it bigger) for the two cases, because they're not the path most
> important to optimize.
>
>
>
> > > - Why should lag tracking only be updated at commit like points? That 
> > > seems
> > >   like it adds odd discontinuinities?
> > >
> >
> > We have previously experimented to call it from non-commit locations
> > but that turned out to give inaccurate information about Lag. See
> > email [1].
>
> That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
> reporting something more frequently.  ISTM that
> WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
> update lag reporting for records that don't strictly need it. I think that
> decision should be made based on the LSN, and be deterministic.
>
>
> > > - Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
> > >   WalSndWriteData() missing wal_sender_timeout <= 0 checks?
> > >
> >
> > It seems we are checking that via
> > ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
> > need to check it before as well?
>
> Either we don't need the precheck at all, or we should do it reliably. Right
> now we'll have a higher overhead / some behavioural changes, if
> wal_sender_timeout is disabled. That doesn't make sense.
>

Fair enough, we can probably do it earlier.

>
> > How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
> > WalSndWaitToSendPendingWrites?
>
> I don't like those much:
>
> We're not really waiting for the data to be sent or such, we just want to give
> it to the kernel to be sent out. Contrast that to WalSndWaitForWal, where we
> actually are waiting for something to complete.
>
> I don't think 'write' is a great description either, although our existing
> terminology is somewhat muddled. We're waiting calling pq_flush() until
> !pq_is_send_pending().
>
> WalSndSendPending() or WalSndFlushPending()?
>

Either of those sounds fine.

-- 
With Regards,
Amit Kapila.




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-12 Thread Amit Kapila
On Sat, Feb 11, 2023 at 3:04 AM Andres Freund  wrote:
>
> > One difference I see with the patch is that I think we will end up
> > sending keepalive for empty prepared transactions even though we don't
> > skip sending begin/prepare messages for those.
>
> With the proposed approach we reliably know whether a callback wrote
> something, so we can tune the behaviour here fairly easily.
>

I would like to clarify a few things about the proposed approach. In
commit_cb_wrapper()/prepare_cb_wrapper(), the patch first did
ctx->did_write = false;, then call the commit/prepare callback (which
will call pgoutput_commit_txn()/pgoutput_prepare_txn()) and then call
update_progress() which will make decisions based on ctx->did_write
flag. Now, for this to work pgoutput_commit_txn/pgoutput_prepare_txn
should know that the transaction has performed some writes before that
call which is currently working because pgoutput is tracking the same
via sent_begin_txn. Is the intention here that we still track whether
BEGIN () has been sent via pgoutput?

-- 
With Regards,
Amit Kapila.




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-10 Thread Andres Freund
Hi,

Replying on the new thread. Original message at
https://www.postgresql.org/message-id/CAA4eK1%2BH2m95HhzfpRkwv2-GtFwtbcVp7837X49%2Bvs0RXX3dBA%40mail.gmail.com


On 2023-02-09 15:54:19 +0530, Amit Kapila wrote:
> One thing to note about the changes we are discussing here is that
> some of the plugins like wal2json already call
> OutputPluginUpdateProgress in their commit callback. They may need to
> update it accordingly.

It was a fundamental mistake to add OutputPluginUpdateProgress(). I don't like
causing unnecessary breakage, but this seems necessary.


> One difference I see with the patch is that I think we will end up
> sending keepalive for empty prepared transactions even though we don't
> skip sending begin/prepare messages for those.

With the proposed approach we reliably know whether a callback wrote
something, so we can tune the behaviour here fairly easily.

Likely WalSndUpdateProgress() should not do anything if
  did_write && !finished_xact.


> The reason why we don't skip sending prepare for empty 2PC xacts is that if
> the WALSender restarts after the PREPARE of a transaction and before the
> COMMIT PREPARED of the same transaction then we won't be able to figure out
> if we have skipped sending BEGIN/PREPARE of a transaction.

It's probably not a good idea to skip sending 2PC state changes anyway, at
least when used for replication, rather than CDC type use cases.

But I again think that that's not something the core system can assume.

I'm sad that we went so far down a pretty obviously bad rabbit hole.  Adding
incrementally more of the progress calls to pgoutput, and knowing that
wal2json also added some, should have run some pretty large alarm bells.


> To skip sending prepare for empty xacts, we previously thought of some ideas
> like (a) At commit-prepare time have a check on the subscriber-side to know
> whether there is a corresponding prepare for it before actually doing
> commit-prepare but that sounded costly. (b) somehow persist the information
> whether the PREPARE for a xact is already sent and then use that information
> for commit prepared but again that also didn't sound like a good idea.

I don't think it's worth optimizing this. However, the explanation for why
we're not skipping empty prepared xacts needs to be added to
pgoutput_prepare_txn() etc.

Greetings,

Andres Freund




Re: Rework LogicalOutputPluginWriterUpdateProgress

2023-02-10 Thread Andres Freund
Hi,

This is a reply to:
https://www.postgresql.org/message-id/CAA4eK1%2BDB66cYRRVyGcaMm7%2BtQ_u%3Dq%3D%2BHWGjpu9X0pqMFWbsZQ%40mail.gmail.com
split off, so patches to address some of my concerns don't confuse cfbot.


On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> On Thu, Feb 9, 2023 at 1:33 AM Andres Freund  wrote:

> > Attached is a current, quite rough, prototype. It addresses some of the 
> > points
> > raised, but far from all. There's also several XXXs/FIXMEs in it.  I changed
> > the file-ending to .txt to avoid hijacking the CF entry.
> >
> 
> I have started a separate thread to avoid such confusion. I hope that
> is fine with you.

In abstract, yes - unfortunately just changing the subject isn't going to
suffice, I'm afraid. The In-Reply-To header was still referencing the old
thread.  The mail archive did see the threads as one, and I think that's what
cfbot uses as the source.


On 2023-02-09 11:21:41 +0530, Amit Kapila wrote:
> On Thu, Feb 9, 2023 at 1:33 AM Andres Freund  wrote:
> >
> > Hacking on a rough prototype how I think this should rather look, I had a 
> > few
> > questions / remarks:
> >
> > - We probably need to call UpdateProgress from a bunch of places in decode.c
> >   as well? Indicating that we're lagging by a lot, just because all
> >   transactions were in another database seems decidedly suboptimal.
> >
> 
> We can do that but I think in all those cases we will reach quickly
> enough back to walsender logic (WalSndLoop - that will send keepalive
> if required) that we don't need to worry. After processing each
> record, the logic will return back to the main loop that will send
> keepalive if required.

For keepalive processing yes, for syncrep and accurate lag tracking, I don't
think that suffices?  We could do that in WalSndLoop() instead I guess, but
we'd have more information about when that's useful in decode.c.


> Also, while reading WAL if we need to block, it will call WalSndWaitForWal()
> which will send keepalive if required.

The fast-path prevents WalSndWaitForWal() from doing that in a lot of cases.

/*
 * Fast path to avoid acquiring the spinlock in case we already know we
 * have enough WAL available. This is particularly interesting if we're
 * far behind.
 */
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
return RecentFlushPtr;


> The patch calls update_progress in change_cb_wrapper and other
> wrappers which will miss the case of DDLs that generates a lot of data
> that is not processed by the plugin. I think for that we either need
> to call update_progress from reorderbuffer.c similar to what the patch
> has removed or we need some other way to address it. Do you have any
> better idea?

I don't mind calling something like update_progress() in the specific cases
that's needed, but I think those are just the
  if (!RelationIsLogicallyLogged(relation))
  if (relation->rd_rel->relrewrite && !rb->output_rewrites))

To me it makes a lot more sense to call update_progress() for those, rather
than generally.


I think, independent of the update_progress calls, it'd be worth investing a
bit of time into optimizing those cases, so that we don't put the changes into
the reorderbuffer in the first place.  I think we find space for two flag bits
to identify the cases in the WAL, rather than needing to access the catalog to
figure it out.  If we don't find space, we could add an annotation the WAL
record (making it bigger) for the two cases, because they're not the path most
important to optimize.



> > - Why should lag tracking only be updated at commit like points? That seems
> >   like it adds odd discontinuinities?
> >
> 
> We have previously experimented to call it from non-commit locations
> but that turned out to give inaccurate information about Lag. See
> email [1].

That seems like an issue with WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, not with
reporting something more frequently.  ISTM that
WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS just isn't a good proxy for when to
update lag reporting for records that don't strictly need it. I think that
decision should be made based on the LSN, and be deterministic.


> > - Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
> >   WalSndWriteData() missing wal_sender_timeout <= 0 checks?
> >
> 
> It seems we are checking that via
> ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
> need to check it before as well?

Either we don't need the precheck at all, or we should do it reliably. Right
now we'll have a higher overhead / some behavioural changes, if
wal_sender_timeout is disabled. That doesn't make sense.


> > - I don't really understand why f95d53edged55 added !end_xact to the if
> >   condition for ProcessPendingWrites(). Is the theory that we'll end up in 
> > an
> >   outer loop soon?
> >
> 
> Yes. For non-empty xacts, we will anyway send a commit message. For
> empty (skipped) x

Re: Rework LogicalOutputPluginWriterUpdateProgress (WAS Re: Logical replication timeout ...)

2023-02-09 Thread Amit Kapila
On Thu, Feb 9, 2023 at 11:21 AM Amit Kapila  wrote:
>
>
> How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
> WalSndWaitToSendPendingWrites?
>

How about renaming WalSndUpdateProgress() to
WalSndUpdateProgressAndSendKeepAlive() or
WalSndUpdateProgressAndKeepAlive()?

One thing to note about the changes we are discussing here is that
some of the plugins like wal2json already call
OutputPluginUpdateProgress in their commit callback. They may need to
update it accordingly.

One difference I see with the patch is that I think we will end up
sending keepalive for empty prepared transactions even though we don't
skip sending begin/prepare messages for those. The reason why we don't
skip sending prepare for empty 2PC xacts is that if the WALSender
restarts after the PREPARE of a transaction and before the COMMIT
PREPARED of the same transaction then we won't be able to figure out
if we have skipped sending BEGIN/PREPARE of a transaction. To skip
sending prepare for empty xacts, we previously thought of some ideas
like (a) At commit-prepare time have a check on the subscriber-side to
know whether there is a corresponding prepare for it before actually
doing commit-prepare but that sounded costly. (b) somehow persist the
information whether the PREPARE for a xact is already sent and then
use that information for commit prepared but again that also didn't
sound like a good idea.

-- 
With Regards,
Amit Kapila.




Rework LogicalOutputPluginWriterUpdateProgress (WAS Re: Logical replication timeout ...)

2023-02-08 Thread Amit Kapila
On Thu, Feb 9, 2023 at 1:33 AM Andres Freund  wrote:
>
> Hacking on a rough prototype how I think this should rather look, I had a few
> questions / remarks:
>
> - We probably need to call UpdateProgress from a bunch of places in decode.c
>   as well? Indicating that we're lagging by a lot, just because all
>   transactions were in another database seems decidedly suboptimal.
>

We can do that but I think in all those cases we will reach quickly
enough back to walsender logic (WalSndLoop - that will send keepalive
if required) that we don't need to worry. After processing each
record, the logic will return back to the main loop that will send
keepalive if required. Also, while reading WAL if we need to block, it
will call WalSndWaitForWal() which will send keepalive if required.
The real problem we have seen in the field reports or tests is that
when we process a large transaction where changes are queued in the
reorderbuffer and while processing those we discard all or most of the
changes.

The patch calls update_progress in change_cb_wrapper and other
wrappers which will miss the case of DDLs that generates a lot of data
that is not processed by the plugin. I think for that we either need
to call update_progress from reorderbuffer.c similar to what the patch
has removed or we need some other way to address it. Do you have any
better idea?

> - Why should lag tracking only be updated at commit like points? That seems
>   like it adds odd discontinuinities?
>

We have previously experimented to call it from non-commit locations
but that turned out to give inaccurate information about Lag. See
email [1].

> - The mix of skipped_xact and ctx->end_xact in WalSndUpdateProgress() seems
>   somewhat odd. They have very overlapping meanings IMO.
>
> - there's no UpdateProgress calls in pgoutput_stream_abort(), but ISTM there
>   should be? It's legit progress.
>

Agreed with both of the above points.

> - That's from 6912acc04f0: I find LagTrackerRead(), LagTrackerWrite() quite
>   confusing, naming-wise. IIUC "reading" is about receiving confirmation
>   messages, "writing" about the time the record was generated.  ISTM that the
>   current time is a quite poor approximation in XLogSendPhysical(), but pretty
>   much meaningless in WalSndUpdateProgress()? Am I missing something?
>

Leaving it for Thomas to answer.

> - Aren't the wal_sender_timeout / 2 checks in WalSndUpdateProgress(),
>   WalSndWriteData() missing wal_sender_timeout <= 0 checks?
>

It seems we are checking that via
ProcessPendingWrites()->WalSndKeepaliveIfNecessary(). Do you think we
need to check it before as well?

> - I don't really understand why f95d53edged55 added !end_xact to the if
>   condition for ProcessPendingWrites(). Is the theory that we'll end up in an
>   outer loop soon?
>

Yes. For non-empty xacts, we will anyway send a commit message. For
empty (skipped) xacts, we will send for synchronous replication case
to avoid any delay.

>
> Attached is a current, quite rough, prototype. It addresses some of the points
> raised, but far from all. There's also several XXXs/FIXMEs in it.  I changed
> the file-ending to .txt to avoid hijacking the CF entry.
>

I have started a separate thread to avoid such confusion. I hope that
is fine with you.

> > > I don't think the syncrep logic in WalSndUpdateProgress really works 
> > > as-is -
> > > consider what happens if e.g. the origin filter filters out entire
> > > transactions. We'll afaics never get to WalSndUpdateProgress(). In some 
> > > cases
> > > we'll be lucky because we'll return quickly to XLogSendLogical(), but not
> > > reliably.
> >

Which case are you worried about? As mentioned in one of the previous
points I thought the timeout/keepalive handling in the callers should
be enough.

> > Is it actually the right thing to check SyncRepRequested() in that logic? 
> > It's
> > quite common to set up syncrep so that individual users or transactions opt
> > into syncrep, but to leave the default disabled.
> >
> > I don't really see an alternative to making this depend solely on
> > sync_standbys_defined.

Fair point.

How about renaming ProcessPendingWrites to WaitToSendPendingWrites or
WalSndWaitToSendPendingWrites?

[1] - 
https://www.postgresql.org/message-id/OS3PR01MB62755D216245199554DDC8DB9EEA9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

-- 
With Regards,
Amit Kapila.