RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-19 Thread kuroda.hay...@fujitsu.com
Dear Hou,

Thanks for updating the patch! Followings are my comments.

===
01. applyparallelworker.c - SIZE_STATS_MESSAGE

```
/*
 * There are three fields in each message received by the parallel apply
 * worker: start_lsn, end_lsn and send_time. Because we have updated these
 * statistics in the leader apply worker, we can ignore these fields in the
 * parallel apply worker (see function LogicalRepApplyLoop).
 */
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
```

According to other comment styles, it seems that the first sentence of the 
comment should
represent the datatype and usage, not the detailed reason.
For example, about ParallelApplyWorkersList, you said "A list ...". How about 
adding like following message:
The message size that can be skipped by parallel apply worker


~~~
02. applyparallelworker.c - parallel_apply_start_subtrans

```
if (current_xid != top_xid &&
!list_member_xid(subxactlist, current_xid))
```

A macro TransactionIdEquals is defined in access/transam.h. Should we use it, 
or is it too trivial?


~~~
03. applyparallelwprker.c - LogicalParallelApplyLoop

```
case SHM_MQ_WOULD_BLOCK:
{
int rc;

if (!in_streamed_transaction)
{
/*
 * If we didn't get any 
transactions for a while there might be
 * unconsumed invalidation 
messages in the queue, consume them
 * now.
 */
AcceptInvalidationMessages();
maybe_reread_subscription();
}

MemoryContextReset(ApplyMessageContext);
```

Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we 
reach here.


~~~
04. applyparallelwprker.c - HandleParallelApplyMessages

```
else if (res == SHM_MQ_SUCCESS)
{
StringInfoData msg;

initStringInfo(&msg);
appendBinaryStringInfo(&msg, data, nbytes);
HandleParallelApplyMessage(winfo, &msg);
pfree(msg.data);
}
```

In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used
but initialized StringInfoData directly initialized. Why there is a difrerence?
The function will do repalloc() and memcpy(), so it may be inefficient.


~~~
05. applyparallelwprker.c - parallel_apply_send_data

```
if (result != SHM_MQ_SUCCESS)
ereport(ERROR,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 errmsg("could not send data to shared-memory 
queue")));

```

I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait 
= false) failed
only when the opposite process has been exited.
How about add a hint or detailed message like "lost connection to parallel 
apply worker"?


===
06. worker.c - nchanges

```
/*
 * The number of changes sent to parallel apply workers during one streaming
 * block.
 */
static uint32 nchanges = 0;
```

I found that the name "nchanges" has been already used in 
apply_spooled_messages().
It works well because the local variable is always used
when name collision between local and global variables is occurred, but I think 
it may be confused.


~~~
07. worker.c - apply_handle_commit_internal

I think we can add an assertion like Assert(replorigin_session_origin_lsn != 
InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId),
to avoid missing replorigin_session_setup. Previously it was set at the entry 
point at never reset.


~~~
08. worker.c - apply_handle_prepare_internal

Same as above.


~~~
09. worker.c - maybe_reread_subscription

```
/*
 * Exit if any parameter that affects the remote connection was changed.
 * The launcher will start a new worker.
 */
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
(errmsg("logical replication apply worker for 

RE: TRAP: FailedAssertion("prev_first_lsn < cur_txn->first_lsn", File: "reorderbuffer.c", Line: 927, PID: 568639)

2022-10-17 Thread kuroda.hay...@fujitsu.com
Dear Sawada-san, Amit,

> IIUC Change-2 is required in v16 and HEAD but not mandatory in v15 and
> v14. The reason why we need Change-2 is that there is a case where we
> mark only subtransactions as containing catalog change while not doing
> that for its top-level transaction. In v15 and v14, since we mark both
> subtransactions and top-level transaction in
> SnapBuildXidSetCatalogChanges() as containing catalog changes, we
> don't get the assertion failure at "Assert(!needs_snapshot ||
> needs_timetravel)".

Incidentally, I agreed that Change-2 is needed for HEAD (and v16), not v15 and 
v14.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-17 Thread kuroda.hay...@fujitsu.com
Dear Osumi-san,

> I mainly followed the steps there and
> replaced the command "SELECT" for the remote table at 6-9 with "INSERT"
> command.
> Then, after waiting for few seconds, the "COMMIT" succeeded like below output,
> even after the server stop of the worker side.

> Additionally, the last reference "SELECT" which failed above can succeed,
> if I restart the worker server before the "SELECT" command to the remote 
> table.
> This means the transaction looks successful but the data isn't there ?
> Could you please have a look at this issue ?

Good catch. This was occurred because we disconnected the crashed server.
 
Previously the failed server had been disconnected in the 
pgfdw_connection_check().
It was OK if the transaction(or statement) was surely cacneled.
But currently the statement might be not canceled because QueryCancelPending 
might be cleaned up[1].

If we failed to cancel statements and reached pgfdw_xact_callback(),
the connection would not be used because entry->conn is NULL.
That's why you sucseeded to do "COMMIT".
However, the backend did not send "COMMIT" command to the srever,
so inserted data was vanished on remote server.

I understood that we should not call disconnect_pg_server(entry->conn) even if 
we detect the disconnection.
If should be controlled in Xact callback. This will be modified in next version.

Moreover, I noticed that we should enable timer even if the QueryCancelMessage 
was not NULL.
If we do not turn on here, the checking will be never enabled again.


[1]: 
https://www.postgresql.org/message-id/TYAPR01MB5866CE34430424A588F60FC2F55D9%40TYAPR01MB5866.jpnprd01.prod.outlook.com


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-17 Thread kuroda.hay...@fujitsu.com
Dear Horiguchi-san,

> Might be on slight different direction, but it looks to me a bit too
> much to use WaitEventSet to check only if a socket is live or not.
> 
> A quick search in the tree told me that we could use pqSocketCheck()
> instead, and I think it would be the something that "could potentially
> go into libpq-fe.h" as Önder mentioned, if I understand what he said
> correctly.

Based on your suggestion, I tried to add like following function to fe-misc.c:

```
int
PQconncheck(PGconn *conn)
{
/* Raise an ERROR if invalid socket has come */
if (conn == NULL ||
PQsocket(conn) == PGINVALID_SOCKET)
return -1;

return pqSocketCheck(conn, 1, 1, -1);
}
```

... and replace pgfdw_connection_check_internal() to PQconncheck(),
but it did not work well.
To be exact, pqSocketCheck() said the socket was "readable" and "writable"
even if the connection has been killed.

IIUC, pqSocketCheck () calls pqSocketPoll(),
and in the pqSocketPoll() we poll()'d the POLLIN or POLLOUT event.
But according to [1], we must wait POLLRDHUP event,
so we cannot reuse it straightforward.

If we really want to move the checking function anyway,
we must follow AddWaitEventToSet() and some WaitEventAdjust functions.
I'm not sure whether it is really good.

[1]: https://man7.org/linux/man-pages/man2/poll.2.html

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-17 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thanks for giving suggestions!

> Still, the reestablish mechanism can be further simplified with
> WL_SOCKET_CLOSED event such as the following (where we should probably
> rename pgfdw_connection_check_internal):

Sounds reasonable.
I think it may be included in this patch. I will try to next (or later) version.

> In other words, a variation of pgfdw_connection_check_internal()
> could potentially go into interfaces/libpq/libpq-fe.h
> (backend/libpq/pqcomm.c or src/interfaces/libpq/fe-connect.c).

Hmm, IIUC libpq related function and data structures cannot be accessed from 
core source,
so we cannot move to pqcomm.c.
(This is a motivation for introducing libpqwalreceiver library. It is used to 
avoid to link libpq directly)
And functions in libpq/fe-connect.c will be included libpq.so,
but latch related functions like WaitEventSetWait() should not be called from 
client application.
So it is also not appropriate.
In short, there are no good position to place the function because this 
requires both of libpq and core functions.


> I still think that it is probably too much work/code to detect the
> mentioned use-case you described on [1]. Each backend constantly
> calling CallCheckingRemoteServersCallbacks() for this purpose doesn't sound
> the optimal way to approach the "check whether server down" problem. You
> typically try to decide whether a server is down by establishing a
> connection (or ping etc), not going over all the existing connections.

Yes, the approach that establishes a new connection is very simple, but I think 
it has some holes.
For example, if the DNS server or some routing software may is stopped,
we will fail to connect to foreign servers.
In your approach, we regard the case as "failed" and try to invalidate the 
server
even if the existing connection can be used.
Another case is that if a server goes down and failover has occurred, we will 
succeed to connect to foreign servers.
We may regard the case as a "success", but we cannot COMMIT the transaction.


> As far as I can think of, it should probably be a single background task
> checking whether the server is down. If so, sending an invalidation message
> to all the backends such that related backends could act on the
> invalidation and throw an error. This is to cover the use-case you
> described on [1].

Indeed your approach covers the use case I said, but I'm not sure whether it is 
really good. 
In your approach, once the background worker process will manage all foreign 
servers.
It may be OK if there are a few servers, but if there are hundreds of servers,
the time interval during checks will be longer.

Currently, each FDW can decide whether we do health checks or not per the 
backend.
For example, we can skip health checks if the foreign server is not used now.
The background worker cannot control such a way.

Moreover, methods to connect to foreign servers and check health are different 
per FDW.
In terms of mysql_fdw [1], we must do mysql_init() and mysql_real_connect().
About file_fdw, we do not have to connect, but developers may want to calculate 
checksum and compare.
Therefore, we must provide callback functions anyway.

Based on the above, I do not agree that we introduce a new background worker 
and make it to do a health check.

> This is of course how I would approach this problem. I think some other
> perspectives on this would be very useful to hear.

Yes, we can hear other opinions :-).

[1]: https://github.com/EnterpriseDB/mysql_fdw

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

 


RE: Add mssing test to test_decoding/meson.build

2022-10-13 Thread kuroda.hay...@fujitsu.com
Dear Michael,

> Thanks, applied.  This was an oversight of 7f13ac8, and the CI accepts
> the test.

I confirmed your commit. Great thanks!


Best Regards,
Hayato Kuroda
FUJITSU LIMITED





Add mssing test to test_decoding/meson.build

2022-10-12 Thread kuroda.hay...@fujitsu.com
Hi hackers,

I found that the test catalog_change_snapshot was missed in 
test_decoding/meson.build file.
PSA the patch to fix it.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



0001-add-missing-test-to-test_decoding-meson.build.patch
Description: 0001-add-missing-test-to-test_decoding-meson.build.patch


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-10-12 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thanks for updating the patch!

I think your saying seems reasonable.
I have no comments anymore now. Thanks for updating so quickly.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-10-11 Thread kuroda.hay...@fujitsu.com
Dear Sawada-san,

> FYI, as I just replied to the related thread[1], the assertion failure
> in REL14 and REL15 can be fixed by the patch proposed there. So I'd
> like to see how the discussion goes. Regardless of this proposed fix,
> the patch proposed by Kuroda-san is required for HEAD, REL14, and
> REL15, in order to fix the assertion failure in SnapBuildCommitTxn().

I understood that my patches for REL14 and REL15 might be not needed.
I will check the thread later. Thanks!

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-10-11 Thread kuroda.hay...@fujitsu.com
Dear Sawada-san,

Thank you for reviewing HEAD patch! PSA v3 patch.

> +# Test that we can force the top transaction to do timetravel when one of sub
> +# transactions needs that. This is necessary when we restart decoding
> from RUNNING_XACT
> +# without the wal to associate subtransaction to its top transaction.
> 
> I don't think the second sentence is necessary.
> 
> ---
> The last decoding
> +# starts from the first checkpoint and NEW_CID of "s0_truncate"
> doesn't mark the top
> +# transaction as catalog modifying transaction. In this scenario, the
> enforcement sets
> +# needs_timetravel to true even if the top transaction is regarded as
> that it does not
> +# have catalog changes and thus the decoding works without a
> contradition that one
> +# subtransaction needed timetravel while its top transaction didn't.
> 
> I don't understand the last sentence, probably it's a long sentence.
> 
> How about the following description?
> 
> # Test that we can handle the case where only subtransaction is marked
> as containing
> # catalog changes. The last decoding starts from NEW_CID generated by
> "s0_truncate" and
> # marks only the subtransaction as containing catalog changes but we
> don't create the
> # association between top-level transaction and subtransaction yet.
> When decoding the
> # commit record of the top-level transaction, we must force the
> top-level transaction
> # to do timetravel since one of its subtransactions is marked as
> containing catalog changes.

Seems good, I replaced all of comments to yours.

> + elog(DEBUG2, "forced transaction %u to do timetravel due to one of
> its subtransaction",
> + xid);
> + needs_timetravel = true;
> 
> I think "one of its subtransaction" should be "one of its subtransactions".

Fixed. 

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



HEAD-v3-0001-Fix-assertion-failure-during-logical-decoding.patch
Description:  HEAD-v3-0001-Fix-assertion-failure-during-logical-decoding.patch


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-10-11 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thank you for updating the patch!

> It is not about CREATE INDEX being async. It is about pg_stat_all_indexes
> being async. If we do not wait, the tests become flaky, because sometimes
> the update has not been reflected in the view immediately.

Make sense, I forgot how stats collector works...

Followings are comments for v16. Only for test codes.

~~~
01. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES 
AFTER ANALYZE

```
# show that index_b is not used
$node_subscriber->poll_query_until(
'postgres', q{select idx_scan=0 from pg_stat_all_indexes where 
indexrelname = 'index_b';}
) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 
two rows via index scan with index on high cardinality column-2";
```

poll_query_until() is still remained here, it should be replaced to is().


02. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN

```
# show that the unique index on replica identity is used even when 
enable_indexscan=false
$result = $node_subscriber->safe_psql('postgres',
"select idx_scan from pg_stat_all_indexes where indexrelname = 
'test_replica_id_full_idx'");
is($result, qq(0), 'ensure subscriber has not used index with 
enable_indexscan=false');
```

Is the comment wrong? The index test_replica_id_full_idx is not used here.


032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER 
ANALYZE

```
# show that index_b is not used
$node_subscriber->poll_query_until(
'postgres', q{select idx_scan=0 from pg_stat_all_indexes where 
indexrelname = 'index_b';}
) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 
two rows via index scan with index on high cardinality column-2";
```

poll_query_until() is still remained here, it should be replaced to is()

032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN

```
# show that the unique index on replica identity is used even when 
enable_indexscan=false
$result = $node_subscriber->safe_psql('postgres',
"select idx_scan from pg_stat_all_indexes where indexrelname = 
'test_replica_id_full_idx'");
is($result, qq(0), 'ensure subscriber has not used index with 
enable_indexscan=false');
```

Is the comment wrong? The index test_replica_id_full_idx is not used here.

03. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN

```
$node_publisher->safe_psql('postgres',
"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX 
test_replica_id_full_unique;");
```

I was not sure why ALTER TABLE REPLICA IDENTITY USING INDEX was done on the 
publisher side.
IIUC this feature works when REPLICA IDENTITY FULL is specified on a publisher,
so it might not be altered here. If so, an index does not have to define on the 
publisher too.

04. 032_subscribe_use_index.pl - SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN

```
$node_subscriber->poll_query_until(
'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where 
indexrelname = 'test_replica_id_full_unique'}
) or die "Timed out while waiting ensuring subscriber used unique index as 
replica identity even with enable_indexscan=false'";
```

03 comment should be added here.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-10-10 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thanks for updating the patch! I checked yours and almost good.
Followings are just cosmetic comments.

===
01. relation.c - GetCheapestReplicaIdentityFullPath

```
 * The reason that the planner would not pick partial indexes and 
indexes
 * with only expressions based on the way currently baserestrictinfos 
are
 * formed (e.g., col_1 = $1 ... AND col_N = $2).
```

Is "col_N = $2" a typo? I think it should be "col_N = $N" or "attr1 = $1 ... 
AND attrN = $N".

===
02. 032_subscribe_use_index.pl

If a table has a primary key on the subscriber, it will be used even if 
enable_indexscan is false(legacy behavior).
Should we test it?

~~~
03. 032_subscribe_use_index.pl -  SUBSCRIPTION RE-CALCULATES INDEX AFTER 
CREATE/DROP INDEX

I think this test seems to be not trivial, so could you write down the 
motivation?

~~~
04. 032_subscribe_use_index.pl -  SUBSCRIPTION RE-CALCULATES INDEX AFTER 
CREATE/DROP INDEX

```
# wait until the index is created
$node_subscriber->poll_query_until(
'postgres', q{select count(*)=1 from pg_stat_all_indexes where 
indexrelname = 'test_replica_id_full_idx';}
) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 
updates one row via index";
```

CREATE INDEX is a synchronous behavior, right? If so we don't have to wait here.
...And the comment in case of die may be wrong.
(There are some cases like this)

~~~
05. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS

```
# Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
#
# Basic test where the subscriber uses index
# and touches 50 rows with UPDATE
```

"touches 50 rows with UPDATE" -> "updates 50 rows", per other tests.

~~~
06. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES 
AFTER ANALYZE

I think this test seems to be not trivial, so could you write down the 
motivation?
(Same as Re-calclate)

~~~
07. 032_subscribe_use_index.pl - SUBSCRIPTION CAN UPDATE THE INDEX IT USES 
AFTER ANALYZE

```
# show that index_b is not used
$node_subscriber->poll_query_until(
'postgres', q{select idx_scan=0 from pg_stat_all_indexes where 
indexrelname = 'index_b';}
) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 
two rows via index scan with index on high cardinality column-2";
```

I think we don't have to wait here, is() should be used instead. 
poll_query_until() should be used only when idx_scan>0 is checked.
(There are some cases like this)

~~~
08. 032_subscribe_use_index.pl - SUBSCRIPTION USES INDEX ON PARTITIONED TABLES

```
# make sure that the subscriber has the correct data
$node_subscriber->poll_query_until(
'postgres', q{select sum(user_id+value_1+value_2)=550070 AND 
count(DISTINCT(user_id,value_1, value_2))=981 from users_table_part;}
) or die "ensure subscriber has the correct data at the end of the test";
```

I think we can replace it to wait_for_catchup() and is()...
Moreover, we don't have to wait here because in above line we wait until the 
index is used on the subscriber.
(There are some cases like this)


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-06 Thread kuroda.hay...@fujitsu.com
Dear Hou,

> Thanks for the suggestion.
> 
> I tried to add a WaitLatch, but it seems affect the performance
> because the Latch might not be set when leader send some
> message to parallel apply worker which means it will wait until
> timeout.

Yes, currently it leader does not notify anything.
To handle that leader must set a latch in parallel_apply_send_data().
It can be done if leader accesses to winfo->shared-> logicalrep_worker_slot_no,
and sets a latch for LogicalRepCtxStruct->worker[slot_no].


Best Regards,
Hayato Kuroda
FUJITSU LIMITED


RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-06 Thread kuroda.hay...@fujitsu.com
Dear Hou,

I put comments for v35-0001.

01. catalog.sgml

```
+   Controls how to handle the streaming of in-progress transactions:
+   f = disallow streaming of in-progress transactions,
+   t = spill the changes of in-progress transactions to
+   disk and apply at once after the transaction is committed on the
+   publisher,
+   p = apply changes directly using a parallel apply
+   worker if available (same as 't' if no worker is available)
```

I'm not sure why 't' means "spill the changes to file". Is it compatibility 
issue?

~~~
02. applyworker.c - parallel_apply_stream_abort

The argument abort_data is not modified in the function. Maybe "const" modifier 
should be added.
(Other functions should be also checked...)

~~~
03. applyparallelworker.c - parallel_apply_find_worker

```
+   ParallelApplyWorkerEntry *entry = NULL;
```

This may not have to be initialized here.

~~~
04. applyparallelworker.c - HandleParallelApplyMessages

```
+   static MemoryContext hpm_context = NULL;
```

I think "hpm" means "handle parallel message", so it should be "hpam".

~~~
05. launcher.c - logicalrep_worker_launch()

```
if (is_subworker)
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication 
parallel worker");
else
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication 
worker");
```

I'm not sure why there are only bgw_type even if there are three types of apply 
workers. Is it for compatibility?

~~~
06. launcher.c - logicalrep_worker_stop_by_slot

An assertion like Assert(slot_no >=0 && slot_no < 
max_logical_replication_workers) should be added at the top of this function.

~~~
07. launcher.c - logicalrep_worker_stop_internal

```
+/*
+ * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and
+ * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)
```

I think logicalrep_worker_stop_internal() may be not "Workhorse" for 
logicalrep_worker_detach(). In the function internal function is called for 
parallel apply worker, and it does not main part of the detach function. 

~~~
08. worker.c - handle_streamed_transaction()

```
+   TransactionId current_xid = InvalidTransactionId;
```

This initialization is not needed. This is not used in non-streaming mode, 
otherwise it is substituted before used.

~~~
09. worker.c - handle_streamed_transaction()

```
+   case TRANS_PARALLEL_APPLY:
+   /* Define a savepoint for a subxact if needed. */
+   parallel_apply_start_subtrans(current_xid, stream_xid);
+   return false;
```

Based on other case-block, Assert(am_parallel_apply_worker()) may be added at 
the top of this part.
This suggestion can be said for other swith-case statements.

~~~
10. worker.c - apply_handle_stream_start

```
+ *
+ * XXX We can avoid sending pair of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one
+ * transaction-at-a-time. However, it is not clear whether that is worth the
+ * effort because it is sent after logical_decoding_work_mem changes.
```

I can understand that START message is not needed, but is STOP really 
removable? If leader does not send STOP to its child, does it lose a chance to 
change the worker-state to IDLE_IN_TRANSACTION?  

~~~
11. worker.c - apply_handle_stream_start

Currently the number of received chunks have not counted, but it can do if a 
variable "nchunks" is defined and incremented in apply_handle_stream_start(). 
This this info may be useful to determine appropriate logical_decoding_work_mem 
for workloads. How do you think?

~~~
12. worker.c - get_transaction_apply_action

{} are not needed.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-06 Thread kuroda.hay...@fujitsu.com
Dear Amit,

> Can't we use WaitLatch in the case of SHM_MQ_WOULD_BLOCK as we are
> using it for the same case at some other place in the code? We can use
> the same nap time as we are using in the leader apply worker.

I'm not sure whether such a short nap time is needed or not.
Because unlike leader apply worker, parallel apply workers do not have timeout 
like wal_receiver_timeout,
so they do not have to check so frequently and send feedback to publisher.
But basically I agree that we can use same logic as leader.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-10-05 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thank you for updating the patch! At first I replied to your comments.

> My thinking on those functions is that they should probably stay
> in src/backend/replication/logical/relation.c. My main motivation is that
> those functions are so much tailored to the purposes of this file that I
> cannot see any use-case for these functions in any other context.

I was not sure what should be, but I agreed that functions will be not used 
from other parts.

> Hmm, I cannot follow this comment. Can you please clarify?

In your patch:

```
+   /* Simple case, we already have a primary key or a replica identity 
index */
+   idxoid = GetRelationIdentityOrPK(localrel);
+   if (OidIsValid(idxoid))
+   return idxoid;
+
+   /*
+* If index scans are disabled, use a sequential scan.
+*
+* Note that we still allowed index scans above when there is a primary
+* key or unique index replica identity, but that is the legacy 
behaviour
+* (even when enable_indexscan is false), so we hesitate to move this
+* enable_indexscan check to be done earlier in this function.
+*/ 
```

And the paragraph " Note that we..." should be at above of 
GetRelationIdentityOrPK().
Future readers will read the function from top to bottom,
and when they read around GetRelationIdentityOrPK() they may be confused.

> So, I think it is better to have specific names, no?

OK.

> The inlined comment in the function has a similar comment. Is that clear
> enough?
> /* * Generate restrictions for all columns in the form of col_1 = $1 AND *
> col_2 = $2 ... */

Actually I missed it, but I still think that whole of emulated SQL should be 
clarified. 

> Though, I agree that we can improve the code a bit. I now
> use targetrelkind and dropped localrelid to check whether the target is a
> partitioned table. Is this better?

Great improvement. Genus!

> Well, I'm not sure if it is worth the complexity. There are only 4 usages
> of the same table, and these are all pretty simple statements, and all
> other tests seem to have a similar pattern. I have not seen any tests where
> these simple statements are done in a function even if they are repeated.
> I'd rather keep it so that this doesn't lead to other style discussions?

If other tests do not combine such parts, it's OK.
My motivation of these comments were to reduce the number of row for the test 
code.

> Oh, I didn't know about this, thanks!

Now meson test system do your test. OK.


And followings are the comments for v14. They are mainly about comments.

===
01. relation.c - logicalrep_rel_open

```
+   /*
+* Finding a usable index is an infrequent task. It occurs when 
an
+* operation is first performed on the relation, or after 
invalidation
+* of the relation cache entry (e.g., such as ANALYZE).
+*/
+   entry->usableIndexOid = 
FindLogicalRepUsableIndex(entry->localrel, remoterel);
```

I thought you can mention CREATE INDEX in the comment.

According to your analysis [1] the relation cache will be invalidated if users 
do CREATE INDEX
At that time the hash entry will be removed (logicalrep_relmap_invalidate_cb) 
and "usable" index
will be checked again.

~~~
02. relation.c - logicalrep_partition_open

```
+   /*
+* Finding a usable index is an infrequent task. It occurs when an
+* operation is first performed on the relation, or after invalidation 
of
+* the relation cache entry (e.g., such as ANALYZE).
+*/
+   entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel);
+
```

Same as above

~~~
03. relation.c - GetIndexOidFromPath

```
+   if (path->pathtype == T_IndexScan || path->pathtype == T_IndexOnlyScan)
+   {
+   IndexPath  *index_sc = (IndexPath *) path;
+
+   return index_sc->indexinfo->indexoid;
+   }
```

I thought Assert(OidIsValid(indexoid)) may be added here. Or is it quite 
trivial?

~~~
04. relation.c - IndexOnlyOnExpression

This method just returns "yes" or "no", so the name of method should be start 
"Has" or "Is".

~~~
05. relation.c - SuitablePathsForRepIdentFull

```
+/*
+ * Iterates over the input path list and returns another
+ * path list that includes index [only] scans where paths
+ * with non-btree indexes, partial indexes or
+ * indexes on only expressions have been removed.
+ */
```

These lines seems to be around 60 columns. Could you expand around 80?

~~~
06. relation.c - SuitablePathsForRepIdentFull

```
+   RelationindexRelation;
+   IndexInfo  *indexInfo;
+   boolis_btree;
+   boolis_partial;
+   boolis_only_on_expression;
+
+   indexRelation = index_open(idxoid, AccessShareLock);
+   indexInfo = BuildIndexInfo(indexR

RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-03 Thread kuroda.hay...@fujitsu.com
Dear Önder,

> As far as I can see this patch is mostly useful for detecting the failures
> on the initial remote command. This is especially common when the remote
> server does a failover/switchover and postgres_fdw uses a cached connection
> to access to the remote server.

Sounds reasonable. Do you mean that we can add additional GUC like 
"postgres_fdw.initial_check",
wait WL_SOCKET_CLOSED if the conneciton is found in the hash table, and do 
reconnection if it might be closed, right?

> I think any extension that deals with multiple Postgres nodes can benefit
> from such logic. In fact, the reason I realized this patch is that on the
> Citus extension, we are trying to solve a similar problem [1], [2].

> Thinking even more, I think any extension that uses libpq and WaitEventSets
> can benefit from such a function.

OK, I agreed it may be useful, but where should this function be?
This cannot be used from application, so it should not in interface/libpq dir. 
So backend/libpq/pqcomm.c?

> I think it also depends on where you decide to put
> pgfdw_connection_check_internal(). If you prefer the postgres_fdw side,
> could we maybe use ConnCacheEntry in contrib/postgres_fdw/connection.c?
> But if you decide to put it into the Postgres side, the API
> for pgfdw_connection_check_internal() -- or equivalent function -- could be
> discussed. Do we pass a WaitEventSet and if it is NULL create a new one,
> else use what is passed to the function? Not sure, maybe you can come up
> with a better API.

Thank you for describing more detail. I can imagine you said.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-03 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thank you for being interest to my patch! Your suggestions will be included to 
newer version.

> In other words, what is the trade-off for calling
> pgfdw_connection_check_internal() inside GetConnection() when we are about
> to use a "cached" connection? I think that might simplify the patch as well

If the checking function is called not periodically but GetConnection(),
it means that the health of foreign servers will be check only when remote 
connections are used.
So following workload described in [1] cannot handle the issue.

BEGIN --- remote operations--- local operations --- COMMIT

But, yes, I perfectly agreed that it could simplify the code
because we can reduce the timer part. This is second plan of this patch,
I may move on this approach if it is still useful.

> Can we have this function/logic on Postgres core, so that other extensions
> can also use?

I was not sure about any use-case, but I think it can because it does quite 
general things.
Is there any good motivation to do that?

> What if PQsocket(conn) returns -1? Maybe we move certain connection state
> checks into pgfdw_connection_check_internal() such that it is more generic?
> I can think of checks like: conn!=NULL, PQsocket(conn) != PGINVALID_SOCKET,
> PQstatus == CONNECTION_OK

ereport(ERROR) will be thrown if PQsocket(conn) returns -1.
All of you said should be handled here. I will modify it.

> Do you see any performance implications of creating/freeing waitEventSets
> per check? I wonder if we can somehow re-use the same waitEventSet by
> modifyWaitEvent? I guess no, but still, if this check causes a performance
> implication, can we somehow cache 1 waitEventSet per connection?

I have not tested yet, but I agreed this will be caused performance decrease.
In next version first I will re-use the event set anyway, and it must be 
considered later.
Actually I'm not sure your suggestion,
but you mean to say that we can add a hash table that associates  PGconn and 
WaitEventSet,  right?

[1]: 
https://www.postgresql.org/message-id/TYAPR01MB58662809E678253B90E82CE5F5889%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-10-03 Thread kuroda.hay...@fujitsu.com
Dear Andres,

> This seems to reliably fail on windows. See

Thanks for reporting. Actually this feature cannot be used on Windows machine.
To check the status of each socket that connects to the foreign server,
the socket event WL_SOCKET_CLOSED is used.
The event is only enabled on some OSes, and Windows machine cannot.  

The part must be skipped if the system cannot be used the event, but I was not 
sure how to do that...

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Question: test "aggregates" failed in 32-bit machine

2022-09-29 Thread kuroda.hay...@fujitsu.com
Dear Tom,

> Hmm, we're not seeing any such failures in the buildfarm's 32-bit
> animals, so there must be some additional condition needed to make
> it happen.  Can you be more specific?

Hmm, I was not sure about additional conditions, sorry.
I could reproduce with followings steps: 

$ git clone https://github.com/postgres/postgres.git
$ cd postgres
$ ./configure --enable-cassert --enable-debug
$ make -j2
$ make check LANG=C

->  aggregates   ... FAILED 3562 ms




The hypervisor of the virtual machine is " VMware vSphere 7.0"

And I picked another information related with the machine.
Could you find something?

```

pg_config]$ ./pg_config 
...
CONFIGURE =  '--enable-cassert' '--enable-debug'
CC = gcc -std=gnu99
CPPFLAGS = -D_GNU_SOURCE
CFLAGS = -Wall -Wmissing-prototypes -Wpointer-arith 
-Wdeclaration-after-statement -Werror=vla -Wendif-labels 
-Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -g -O2
CFLAGS_SL = -fPIC
LDFLAGS = -Wl,--as-needed -Wl,-rpath,'/usr/local/pgsql/lib',--enable-new-dtags
LDFLAGS_EX = 
LDFLAGS_SL = 
LIBS = -lpgcommon -lpgport -lz -lreadline -lrt -ldl -lm 
VERSION = PostgreSQL 16devel

$ locale
LANG=C
...

$ arch 
i686


$cat /proc/cpuinfo 
processor   : 0
vendor_id   : GenuineIntel
cpu family  : 6
model   : 85
model name  : Intel(R) Xeon(R) Platinum 8260 CPU @ 2.40GHz
stepping: 7
microcode   : 83898371
cpu MHz : 2394.374
cache size  : 36608 KB
physical id : 0
siblings: 1
core id : 0
cpu cores   : 1
apicid  : 0
initial apicid  : 0
fdiv_bug: no
hlt_bug : no
f00f_bug: no
coma_bug: no
fpu : yes
fpu_exception   : yes
cpuid level : 22
wp  : yes
flags   : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov 
pat pse36 clflush mmx fxsr sse sse2 ss nx pdpe1gb rdtscp lm constant_tsc 
arch_perfmon xtopology tsc_reliable nonstop_tsc unfair_spinlock eagerfpu pni 
pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 
3dnowprefetch arat xsaveopt ssbd ibrs ibpb stibp fsgsbase bmi1 avx2 smep bmi2 
invpcid avx512f rdseed adx avx512cd md_clear flush_l1d arch_capabilities
bogomips: 4788.74
clflush size: 64
cache_alignment : 64
address sizes   : 43 bits physical, 48 bits virtual
power management:

processor   : 1
vendor_id   : GenuineIntel
cpu family  : 6
model   : 85
model name  : Intel(R) Xeon(R) Platinum 8260 CPU @ 2.40GHz
stepping: 7
microcode   : 83898371
cpu MHz : 2394.374
cache size  : 36608 KB
physical id : 2
siblings: 1
core id : 0
cpu cores   : 1
apicid  : 2
initial apicid  : 2
fdiv_bug: no
hlt_bug : no
f00f_bug: no
coma_bug: no
fpu : yes
fpu_exception   : yes
cpuid level : 22
wp  : yes
flags   : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov 
pat pse36 clflush mmx fxsr sse sse2 ss nx pdpe1gb rdtscp lm constant_tsc 
arch_perfmon xtopology tsc_reliable nonstop_tsc unfair_spinlock eagerfpu pni 
pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt 
tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 
3dnowprefetch arat xsaveopt ssbd ibrs ibpb stibp fsgsbase bmi1 avx2 smep bmi2 
invpcid avx512f rdseed adx avx512cd md_clear flush_l1d arch_capabilities
bogomips: 4788.74
clflush size: 64
cache_alignment : 64
address sizes   : 43 bits physical, 48 bits virtual
power management
```

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-29 Thread kuroda.hay...@fujitsu.com
Dear Hou,

Thanks for updating patch. I will review yours soon, but I reply to your 
comment.

> > 04. applyparallelworker.c - LogicalParallelApplyLoop()
> >
> > ```
> > +   shmq_res = shm_mq_receive(mqh, &len, &data, false);
> > ...
> > +   if (ConfigReloadPending)
> > +   {
> > +   ConfigReloadPending = false;
> > +   ProcessConfigFile(PGC_SIGHUP);
> > +   }
> > ```
> >
> >
> > Here the parallel apply worker waits to receive messages and after 
> > dispatching
> > it ProcessConfigFile() is called.
> > It means that .conf will be not read until the parallel apply worker 
> > receives new
> > messages and apply them.
> >
> > It may be problematic when users change log_min_message to debugXXX for
> > debugging but the streamed transaction rarely come.
> > They expected that detailed description appears on the log from next
> > streaming chunk, but it does not.
> >
> > This does not occur in leader worker when it waits messages from publisher,
> > because it uses libpqrcv_receive(), which works asynchronously.
> >
> > I 'm not sure whether it should be documented that the evaluation of GUCs 
> > may
> > be delayed, how do you think?
> 
> I changed the shm_mq_receive to asynchronous mode which is also consistent
> with
> what we did for Gather node when reading data from parallel query workers.

I checked your implementation, but it seemed that the parallel apply worker 
will not sleep
even if there are no messages or signals. It might be very inefficient.

In gather node - gather_readnext(), the same way is used, but I think there is 
a premise
that the wait-time is short because it is related with only one gather node.
In terms of parallel apply worker, however, we cannot predict the wait-time 
because
it is related with the streamed transactions. If such transactions rarely come, 
parallel apply workers may spend many CPU time.

I think we should wait during short time or until leader notifies, if shmq_res 
== SHM_MQ_WOULD_BLOCK.
How do you think?


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



Question: test "aggregates" failed in 32-bit machine

2022-09-28 Thread kuroda.hay...@fujitsu.com
Hi hackers,

While running `make check LANC=C` with 32-bit virtual machine,
I found that it was failed at "aggregates". PSA the a1b3bca1_regression.diffs.
IIUC that part has been added by db0d67db. 
I checked out the source, tested, and got same result. PSA the 
db0d67db_regression.diffs

I'm not sure about it, but is it an expected behavior? I know that we do not 
have to
consider about "row" ordering, 

Followings show the environment. Please tell me if another information is 
needed.

OS: RHEL 6.10 server 
Arch: i686
Gcc: 4.4.7

$ uname -a
Linux VMX 2.6.32-754.41.2.el6.i686 #1 SMP Sat Jul 10 04:21:20 EDT 2021 i686 
i686 i386 GNU/Linux

Configure option: --enable-cassert --enable-debug --enable-tap-tests

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



a1b3bca1_regression.diffs
Description: a1b3bca1_regression.diffs


db0d67db_regression.diffs
Description: db0d67db_regression.diffs


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-09-27 Thread kuroda.hay...@fujitsu.com
Dear Önder:

Thank you for updating patch! 
Your documentation seems OK, and I could not find any other places to be added

Followings are my comments.


01 relation.c - general

Many files are newly included.
I was not sure but some codes related with planner may be able to move to 
src/backend/optimizer/plan.
How do you and any other one think?

02 relation.c - FindLogicalRepUsableIndex

```
+/*
+ * Returns an index oid if we can use an index for the apply side. If not,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
```

I grepped files, but I cannot find the word "apply side". How about 
"subscriber" instead?

03 relation.c - FindLogicalRepUsableIndex

```
+   /* Simple case, we already have an identity or pkey */
+   idxoid = GetRelationIdentityOrPK(localrel);
+   if (OidIsValid(idxoid))
+   return idxoid;
+
+   /*
+* If index scans are disabled, use a sequential scan.
+*
+* Note that we still allowed index scans above when there is a primary
+* key or unique index replica identity, but that is the legacy 
behaviour
+* (even when enable_indexscan is false), so we hesitate to move this
+* enable_indexscan check to be done earlier in this function.
+*/
+   if (!enable_indexscan)
+   return InvalidOid;
```

a. 
I think "identity or pkey" should be "replica identity key or primary key" or 
"RI or PK"

b. 
Later part should be at around GetRelationIdentityOrPK.


04 relation.c - FindUsableIndexForReplicaIdentityFull

```
+   MemoryContext usableIndexContext;
...
+   usableIndexContext = AllocSetContextCreate(CurrentMemoryContext,
+   
   "usableIndexContext",
+   
   ALLOCSET_DEFAULT_SIZES);
```

I grepped other sources, and I found that the name like "tmpcxt" is used for 
the temporary MemoryContext.

05 relation.c - SuitablePathsForRepIdentFull

```
+   indexRelation = index_open(idxoid, AccessShareLock);
+   indexInfo = BuildIndexInfo(indexRelation);
+   is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+   is_partial = (indexInfo->ii_Predicate != NIL);
+   is_only_on_expression = 
IndexOnlyOnExpression(indexInfo);
+   index_close(indexRelation, NoLock);
```

Why the index is closed with NoLock? AccessShareLock is acquired, so shouldn't 
same lock be released?


06 relation.c - GetCheapestReplicaIdentityFullPath

IIUC a query like "SELECT tbl WHERE attr1 = $1 AND attr2 = $2 ... AND attrN = 
$N" is emulated, right?
you can write explicitly it as comment

07 relation.c - GetCheapestReplicaIdentityFullPath

```
+   Path   *path = (Path *) lfirst(lc);
+   Oid idxoid = GetIndexOidFromPath(path);
+
+   if (!OidIsValid(idxoid))
+   {
+   /* Unrelated Path, skip */
+   suitableIndexList = lappend(suitableIndexList, path);
+   }
```

I was not clear about here. IIUC in the function we want to extract "good" scan 
plan and based on that the cheapest one is chosen. 
GetIndexOidFromPath() seems to return InvalidOid when the input path is not 
index scan, so why is it appended to the suitable list?


===
08 worker.c - usable_indexoid_internal

I think this is not "internal" function, such name should be used for like 
"apply_handle_commit" - "apply_handle_commit_internal", or 
"apply_handle_insert" - "apply_handle_insert_internal".
How about "get_usable_index" or something?

09 worker.c - usable_indexoid_internal

```
+   Oid targetrelid = 
targetResultRelInfo->ri_RelationDesc->rd_rel->oid;
+   Oid localrelid = relinfo->ri_RelationDesc->rd_id;
+
+   if (targetrelid != localrelid)
```

I think these lines are very confusable.
IIUC targetrelid is corresponded to the "parent", and localrelid is 
corresponded to the "child", right?
How about changing name to "partitionedoid" and "leadoid" or something?

===
10 032_subscribe_use_index.pl

```
# create tables pub and sub
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_replica_id_full (x int)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_replica_id_full (x int)");
$node_subscriber->safe_psql('postgres',
"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
```

In many places same table is defined, altered as "REPLICA IDENTITY FULL", and 
index is created.
Could you combine them into function?

11 032_subscribe_use_index.pl

```
# wait until the index is used on the subscri

RE: [small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"

2022-09-27 Thread kuroda.hay...@fujitsu.com
Dear Michael,

> Yeah, at least as of the cancel callback psql_cancel_callback() that
> handle_sigint() would call on SIGINT as this is set by psql.  So it
> does not seem right to use a boolean rather than a sig_atomic_t in
> this case, as well.

PSA fix patch. Note that PromptInterruptContext.enabled was also fixed
because it is substituted from sigint_interrupt_enabled

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



0001-Mark-sigint_interrupt_enabled-as-sig_atomic_t.patch
Description: 0001-Mark-sigint_interrupt_enabled-as-sig_atomic_t.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-26 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Followings are comments for your patchset.


0001


01. launcher.c - logicalrep_worker_stop_internal()

```
+
+   Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
```

I think it should be Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, 
LW_SHARED))
because the lock is released one and acquired again as LW_SHARED.
If newer function has been acquired lock as LW_EXCLUSIVE and call 
logicalrep_worker_stop_internal(),
its lock may become weaker after calling it.

02. launcher.c - apply_handle_stream_start()

```
+   /*
+* Notify handle methods we're processing a remote 
in-progress
+* transaction.
+*/
+   in_streamed_transaction = true;
 
-   MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
-   FileSetInit(MyLogicalRepWorker->stream_fileset);
+   /*
+* Start a transaction on stream start, this 
transaction will be
+* committed on the stream stop unless it is a 
tablesync worker in
+* which case it will be committed after processing all 
the
+* messages. We need the transaction for handling the 
buffile,
+* used for serializing the streaming data and subxact 
info.
+*/
+   begin_replication_step();
```

Previously in_streamed_transaction was set after the begin_replication_step(),
but the ordering is modified. Maybe we don't have to modify it if there is no 
particular reason.

03. launcher.c - apply_handle_stream_stop()

```
+   /* Commit the per-stream transaction */
+   CommitTransactionCommand();
+
+   /* Reset per-stream context */
+   MemoryContextReset(LogicalStreamingContext);
+
+   pgstat_report_activity(STATE_IDLE, NULL);
+
+   in_streamed_transaction = false;
```

Previously in_streamed_transaction was set after the MemoryContextReset(), but 
the ordering is modified.
Maybe we don't have to modify it if there is no particular reason.

04. applyparallelworker.c - LogicalParallelApplyLoop()

```
+   shmq_res = shm_mq_receive(mqh, &len, &data, false);
...
+   if (ConfigReloadPending)
+   {
+   ConfigReloadPending = false;
+   ProcessConfigFile(PGC_SIGHUP);
+   }
```


Here the parallel apply worker waits to receive messages and after dispatching 
it ProcessConfigFile() is called.
It means that .conf will be not read until the parallel apply worker receives 
new messages and apply them.

It may be problematic when users change log_min_message to debugXXX for 
debugging but the streamed transaction rarely come.
They expected that detailed description appears on the log from next streaming 
chunk, but it does not.

This does not occur in leader worker when it waits messages from publisher, 
because it uses libpqrcv_receive(), which works asynchronously.

I 'm not sure whether it should be documented that the evaluation of GUCs may 
be delayed, how do you think?

===
0004

05. logical-replication.sgml

```
...
In that case, it may be necessary to change the streaming mode to on or off and 
cause
the same conflicts again so the finish LSN of the failed transaction will be 
written to the server log.
 ...
```

Above sentence is added by 0001, but it is not modified by 0004.
Such transactions will be retried as streaming=on mode, so some descriptions 
related with it should be added.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"

2022-09-26 Thread kuroda.hay...@fujitsu.com
Dear Michael,

> Done this one.  I have scanned the code, but did not notice a similar
> mistake. 

I found your commit, thanks!

> It is worth noting that we have only one remaining "volatile
> bool" in the headers now.

Maybe you mentioned about sigint_interrupt_enabled,
and it also seems to be modified in the signal handler.
But I think any race conditions may be not occurred here
because if the value is set in the handler the code jump will be also happened.

Of course it's OK to mark the variable to sig_atomic_t too if there is no 
problem.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-26 Thread kuroda.hay...@fujitsu.com
Dear Wang, 

Thanks for updating patch!... but cfbot says that it cannot be accepted [1].
I thought the header  should be included, like miscadmin.h.

[1]: https://cirrus-ci.com/task/5909508684775424

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



[small patch] Change datatype of ParallelMessagePending from "volatile bool" to "volatile sig_atomic_t"

2022-09-25 Thread kuroda.hay...@fujitsu.com
Hi hackers,

While reviewing [1], I and Amit noticed that a flag ParallelMessagePending is 
defined
as "volatile bool", but other flags set by signal handlers are defined as 
"volatile sig_atomic_t".

The datatype has been defined in standard C,
and it says that variables referred by signal handlers should be "volatile 
sig_atomic_t".
(Please see my observation [2])

This may be not needed because any failures had been reported,
but I thought their datatype should be same and attached a small patch.

How do you think?

[1]: https://commitfest.postgresql.org/39/3621/
[2]: 
https://www.postgresql.org/message-id/TYAPR01MB5866C056BB9F81A42B85D20BF54E9%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



0001-Change-datatype-of-ParallelMessagePending-to-keep-co.patch
Description:  0001-Change-datatype-of-ParallelMessagePending-to-keep-co.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-22 Thread kuroda.hay...@fujitsu.com
Hi Amit,

> > I checked other flags that are set by signal handlers, their datatype 
> > seemed to
> be sig_atomic_t.
> > Is there any reasons that you use normal bool? It should be changed if not.
> >
> 
> It follows the logic similar to ParallelMessagePending. Do you see any
> problem with it?

Hmm, one consideration is:
what will happen if the signal handler HandleParallelApplyMessageInterrupt() is 
fired during "ParallelApplyMessagePending = false;"?
IIUC sig_atomic_t has been needed to avoid writing to same data at the same 
time.

According to C99 standard(I grepped draft version [1]), the behavior seems to 
be undefined if the signal handler
attaches to not "volatile sig_atomic_t" data.
...But I'm not sure whether this is really problematic in the current system, 
sorry...

```
If the signal occurs other than as the result of calling the abort or raise 
function,
the behavior is undefined if the signal handler refers to any object with 
static storage duration other than by assigning a value to an object declared 
as volatile sig_atomic_t,
or the signal handler calls any function in the standard library other than the 
abort function,
the _Exit function, or the signal function with the first argument equal to the 
signal number corresponding to the signal that caused the invocation of the 
handler.
```

> > a.
> > I was not sure when the cell should be cleaned. Currently we clean up
> ParallelApplyWorkersList() only in the parallel_apply_start_worker(),
> > but we have chances to remove such a cell like HandleParallelApplyMessages()
> or HandleParallelApplyMessage(). How do you think?
> >
> 
> Note that HandleParallelApply* are invoked during interrupt handling,
> so it may not be advisable to remove it there.
> 
> >
> > 12. ConfigureNamesInt
> >
> > ```
> > +   {
> > +   {"max_parallel_apply_workers_per_subscription",
> > +   PGC_SIGHUP,
> > +   REPLICATION_SUBSCRIBERS,
> > +   gettext_noop("Maximum number of parallel apply
> workers per subscription."),
> > +   NULL,
> > +   },
> > +   &max_parallel_apply_workers_per_subscription,
> > +   2, 0, MAX_BACKENDS,
> > +   NULL, NULL, NULL
> > +   },
> > ```
> >
> > This parameter can be changed by pg_ctl reload, so the following corner case
> may be occurred.
> > Should we add a assign hook to handle this? Or, can we ignore it?
> >
> 
> I think we can ignore this as it will eventually start respecting the 
> threshold.

Both of you said are reasonable for me.

[1]: https://www.open-std.org/JTC1/SC22/WG14/www/docs/n1256.pdf

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-22 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thanks for updating the patch! Followings are comments for v33-0001.

===
libpqwalreceiver.c

01. inclusion

```
+#include "catalog/pg_subscription.h"
```

We don't have to include it because the analysis of parameters is done at 
caller.

===
launcher.c

02. logicalrep_worker_launch()

```
+   /*
+* Return silently if the number of parallel apply workers reached the
+* limit per subscription.
+*/
+   if (is_subworker && nparallelapplyworkers >= 
max_parallel_apply_workers_per_subscription)
```

a. 
I felt that it might be kind if we output some debug messages.

b.
The if statement seems to be more than 80 characters. You can move to new line 
around "nparallelapplyworkers >= ...".


===
applyparallelworker.c

03. declaration

```
+/*
+ * Is there a message pending in parallel apply worker which we need to
+ * receive?
+ */
+volatile bool ParallelApplyMessagePending = false;
```

I checked other flags that are set by signal handlers, their datatype seemed to 
be sig_atomic_t.
Is there any reasons that you use normal bool? It should be changed if not.

04. HandleParallelApplyMessages()

```
+   if (winfo->error_mq_handle == NULL)
+   continue;
```

a.
I was not sure when the cell should be cleaned. Currently we clean up 
ParallelApplyWorkersList() only in the parallel_apply_start_worker(),
but we have chances to remove such a cell like HandleParallelApplyMessages() or 
HandleParallelApplyMessage(). How do you think?

b.
Comments should be added even if we keep this, like "exited worker, skipped".

```
+   else
+   ereport(ERROR,
+   
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+errmsg("lost connection to the leader 
apply worker")));
```

c.
This function is called on the leader apply worker, so the hint should be "lost 
connection to the parallel apply worker".

05. parallel_apply_setup_worker()

``
+   if (launched)
+   {
+   ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, 
winfo);
+   }
```

{} should be removed.


06. parallel_apply_wait_for_xact_finish()

```
+   /* If any workers have died, we have failed. */
```

This function checked only about a parallel apply worker, so the comment should 
be "if worker has..."?

===
worker.c

07. handle_streamed_transaction()

```
+ * For non-streamed transactions, returns false;
```

"returns false;" -> "returns false"

apply_handle_commit_prepared(), apply_handle_abort_prepared()

These functions are not expected that parallel worker calls
so I think Assert() should be added.

08. UpdateWorkerStats()

```
-static void
+void
 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
```

This function is called only in worker.c, should be static.

09. subscription_change_cb()

```
-static void
+void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
```

This function is called only in worker.c, should be static.

10. InitializeApplyWorker()

```
+/*
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
 void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
```

Some comments should be added about this is a common part of leader and 
parallel apply worker.

===
logicalrepworker.h

11. declaration

```
extern PGDLLIMPORT volatile bool ParallelApplyMessagePending;
```

Please refer above comment.

===
guc_tables.c

12. ConfigureNamesInt

```
+   {
+   {"max_parallel_apply_workers_per_subscription",
+   PGC_SIGHUP,
+   REPLICATION_SUBSCRIBERS,
+   gettext_noop("Maximum number of parallel apply workers 
per subscription."),
+   NULL,
+   },
+   &max_parallel_apply_workers_per_subscription,
+   2, 0, MAX_BACKENDS,
+   NULL, NULL, NULL
+   },
```

This parameter can be changed by pg_ctl reload, so the following corner case 
may be occurred.
Should we add a assign hook to handle this? Or, can we ignore it?

1. set max_parallel_apply_workers_per_subscription to 4.
2. start replicating two streaming transactions.
3. commit transactions
=== Two parallel workers will be remained ===
4. change max_parallel_apply_workers_per_subscription to 3
5. We expected that only one worker remains, but two parallel workers remained. 
  It will be not stopped until another streamed transaction is started and 
committed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-09-21 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thanks for checking!

> These failed to be applied to the master branch cleanly. Could you update 
> them?

PSA rebased patches. I reviewed my myself and they contain changes.
E.g., move GUC-related code to option.c.


> +  this option relies on kernel events exposed by Linux, macOS,
> 
> s/this/This

Fixed.

> 
> + GUC_check_errdetail("pgfdw_health_check_interval must be set
> to 0 on this platform");
> 
> The actual parameter name "postgres_fdw.health_check_interval"
> should be used for the message instead of internal variable name.

Fixed.

> This registered signal handler does lots of things. But that's not acceptable
> and they should be performed outside signal handler. No?


I modified like v09 or earlier versions, which has a mechanism for registering 
CheckingRemoteServersCallback.
It had been removed because we want to keep core simpler, but IIUC it is needed
if the signal handler just sets some flags.
The core-side does not consider the current status of transaction and running 
query for simpleness.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v15-0001-Add-an-infrastracture-for-checking-remote-server.patch
Description:  v15-0001-Add-an-infrastracture-for-checking-remote-server.patch


v15-0002-postgres_fdw-Implement-health-check-feature.patch
Description:  v15-0002-postgres_fdw-Implement-health-check-feature.patch


v15-0003-add-doc.patch
Description: v15-0003-add-doc.patch


v15-0004-add-test.patch
Description: v15-0004-add-test.patch


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-09-20 Thread kuroda.hay...@fujitsu.com
> One last thing - do you think there is any need to mention this
> behaviour in the pgdocs, or is OK just to be a hidden performance
> improvement?

FYI - I put my opinion.
We have following sentence in the logical-replication.sgml:

```
...
If the table does not have any suitable key, then it can be set
   to replica identity full, which means the entire row becomes
   the key.  This, however, is very inefficient and should only be used as a
   fallback if no other solution is possible.
...
```

Here the word "very inefficient" may mean that sequential scans will be 
executed every time.
I think some descriptions can be added around here.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-09-19 Thread kuroda.hay...@fujitsu.com
Dear Önder,

Thanks for updating the patch! I will check it later.
Currently I just reply to your comments.

> Hmm, I couldn't realize this comment earlier. So you suggest "slow" here 
> refers to the additional function call "GetRelationIdentityOrPK"? If so, yes 
> I'll update that.

Yes I meant to say that, because functions will be called like:

GetRelationIdentityOrPK() -> RelationGetPrimaryKeyIndex() -> 
RelationGetIndexList() -> ..

and according to comments last one seems to do the heavy lifting.


> Makes sense, simplified the function. Though, it is always hard to pick good 
> names for these kinds of helper functions. I picked 
> GenerateDummySelectPlannerInfoForRelation(), does that sound good to you as 
> well?

I could not find any better naming than yours. 

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2022-09-15 Thread kuroda.hay...@fujitsu.com
Dear  Önder,

Thank you for proposing good feature. I'm also interested in the patch, 
So I started to review this. Followings are initial comments.

===
For execRelation.c

01. RelationFindReplTupleByIndex()

```
/* Start an index scan. */
InitDirtySnapshot(snap);
-   scan = index_beginscan(rel, idxrel, &snap,
-  
IndexRelationGetNumberOfKeyAttributes(idxrel),
-  0);
 
/* Build scan key. */
-   build_replindex_scan_key(skey, rel, idxrel, searchslot);
+   scankey_attoff = build_replindex_scan_key(skey, rel, idxrel, 
searchslot);
 
+   scan = index_beginscan(rel, idxrel, &snap, scankey_attoff, 0);
```

I think "/* Start an index scan. */" should be just above index_beginscan().

===
For worker.c

02. sable_indexoid_internal()

```
+ * Note that if the corresponding relmapentry has InvalidOid usableIndexOid,
+ * the function returns InvalidOid.
+ */
+static Oid
+usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo)
```

"InvalidOid usableIndexOid" should be "invalid usableIndexOid,"

03. check_relation_updatable()

```
 * We are in error mode so it's fine this is somewhat slow. It's better 
to
 * give user correct error.
 */
-   if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+   if (OidIsValid(rel->usableIndexOid))
{
```

Shouldn't we change the above comment to? The check is no longer slow.

===
For relation.c

04. GetCheapestReplicaIdentityFullPath()

```
+static Path *
+GetCheapestReplicaIdentityFullPath(Relation localrel)
+{
+   PlannerInfo *root;
+   Query  *query;
+   PlannerGlobal *glob;
+   RangeTblEntry *rte;
+   RelOptInfo *rel;
+   int attno;
+   RangeTblRef *rt;
+   List *joinList;
+   Path *seqScanPath;
```

I think the part that constructs dummy-planner state can be move to another 
function
because that part is not meaningful for this.
Especially line 824-846 can. 


===
For 032_subscribe_use_index.pl

05. general

```
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO test_replica_id_full SELECT i%20 FROM 
generate_series(0,1000)i;"
+);
```

It seems that the range of initial data seems [0, 19].
Same mistake-candidates are found many place.

06. general

```
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+   "UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
```

Only 50 tuples are modified here.
Same mistake-candidates are found many place.

07. general

```
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+   'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where 
indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_3 
updates 200 rows via index"; 
```
The query will be executed until the index scan is finished, but it may be not 
commented.
How about changing it to "we wait until the index used on the subscriber-side." 
or something?
Same comments are found in many place.

08. test related with ANALYZE

```
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - 
PARTITIONED TABLE
+# 
```

"Testcase start:" should be "Testcase end:" here.

09. general

In some tests results are confirmed but in other test they are not.
I think you can make sure results are expected in any case if there are no 
particular reasons.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: logical replication restrictions

2022-09-14 Thread kuroda.hay...@fujitsu.com
Hi,

Sorry for noise but I found another bug.
When the 032_apply_delay.pl is modified like following,
the test will be always failed even if my patch is applied.

```
# Disable subscription. worker should die immediately.
-$node_subscriber->safe_psql('postgres',
-   "ALTER SUBSCRIPTION tap_sub DISABLE"
+$node_subscriber->safe_psql('postgres', q{
+BEGIN;
+ALTER SUBSCRIPTION tap_sub DISABLE;
+SELECT pg_sleep(1);
+COMMIT;
+}
 );
```

The point of failure is same as I reported previously.

```
...
2022-09-14 12:00:48.891 UTC [11330] 032_apply_delay.pl LOG:  statement: ALTER 
SUBSCRIPTION tap_sub SET (min_apply_delay = 8646)
2022-09-14 12:00:48.910 UTC [11226] DEBUG:  sending feedback (force 0) to recv 
0/1690220, write 0/1690220, flush 0/1690220
2022-09-14 12:00:48.937 UTC [11208] DEBUG:  server process (PID 11328) exited 
with exit code 0
2022-09-14 12:00:48.950 UTC [11226] DEBUG:  logical replication apply delay: 
86459996 ms
2022-09-14 12:00:48.950 UTC [11226] CONTEXT:  processing remote data for 
replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 
0/16902A8
2022-09-14 12:00:48.979 UTC [11208] DEBUG:  forked new backend, pid=11334 
socket=6
2022-09-14 12:00:49.007 UTC [11334] 032_apply_delay.pl LOG:  statement: BEGIN;
2022-09-14 12:00:49.008 UTC [11334] 032_apply_delay.pl LOG:  statement: ALTER 
SUBSCRIPTION tap_sub DISABLE;
2022-09-14 12:00:49.009 UTC [11334] 032_apply_delay.pl LOG:  statement: SELECT 
pg_sleep(1);
2022-09-14 12:00:49.009 UTC [11226] DEBUG:  check status of MySubscription
2022-09-14 12:00:49.009 UTC [11226] CONTEXT:  processing remote data for 
replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 
0/16902A8
2022-09-14 12:00:49.009 UTC [11226] DEBUG:  logical replication apply delay: 
86459937 ms
2022-09-14 12:00:49.009 UTC [11226] CONTEXT:  processing remote data for 
replication origin "pg_16393" during "BEGIN" in transaction 734 finished at 
0/16902A8
...
```

I think it may be caused that waken worker read catalogs that have not modified 
yet.
In AlterSubscription(), the backend kicks the apply worker ASAP, but it should 
be at 
end of the transaction, like ApplyLauncherWakeupAtCommit() and 
AtEOXact_ApplyLauncher().

```
+   /*
+* If this subscription has been disabled and 
it has an apply
+* delay set, wake up the logical replication 
worker to finish
+* it as soon as possible.
+*/
+   if (!opts.enabled && sub->applydelay > 0)
+   logicalrep_worker_wakeup(sub->oid, 
InvalidOid);
+
```

How do you think?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-13 Thread kuroda.hay...@fujitsu.com
Hi,

> > 01. filename
> > The word-ordering of filename seems not good
> > because you defined the new worker as "parallel apply worker".
> >
> 
> I think in the future we may have more files for apply work (like
> applyddl.c for DDL apply work), so it seems okay to name all apply
> related files in a similar way.

> > That flag is set only when an apply worker spill the transaction to the 
> > disk.
> > How about "in_streamed_transaction" -> "in_spilled_transaction"?
> >
> 
> Isn't this an existing variable? If so, it doesn't seem like a good
> idea to change the name unless we are changing its meaning.

Both of you said are reasonable. They do not have to be modified.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-13 Thread kuroda.hay...@fujitsu.com
Dear Hou-san,

> I will dig your patch more, but I send partially to keep the activity of the 
> thread.

More minor comments about v28.

===
About 0002 

For 015_stream.pl

14. check_parallel_log

```
+# Check the log that the streamed transaction was completed successfully
+# reported by parallel apply worker.
+sub check_parallel_log
+{
+   my ($node_subscriber, $offset, $is_parallel)= @_;
+   my $parallel_message = 'finished processing the transaction finish 
command';
+
+   if ($is_parallel)
+   {
+   $node_subscriber->wait_for_log(qr/$parallel_message/, $offset);
+   }
+}
```

I think check_parallel_log() should be called only when streaming = 'parallel' 
and if-statement is not needed

===
For 016_stream_subxact.pl

15. test_streaming

```
+   INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 
 500) s(i);
```

"3" should be "3".

===
About 0003

For applyparallelworker.c

16. parallel_apply_relation_check()

```
+   if (rel->parallel_apply_safe == PARALLEL_APPLY_SAFETY_UNKNOWN)
+   logicalrep_rel_mark_parallel_apply(rel);
```

I was not clear when logicalrep_rel_mark_parallel_apply() is called here.
IIUC parallel_apply_relation_check() is called when parallel apply worker 
handles changes,
but before that relation is opened via logicalrep_rel_open() and 
parallel_apply_safe is set here.
If it guards some protocol violation, we may use Assert().

===
For create_subscription.sgml

17.
The restriction about foreign key does not seem to be documented.

===
About 0004

For 015_stream.pl

18. check_parallel_log

I heard that the removal has been reverted, but in the patch
check_parallel_log() is removed again... :-(


===
About throughout

I checked the test coverage via `make coverage`. About appluparallelworker.c 
and worker.c, both function coverage is 100%, and
line coverages are 86.2 % and 94.5 %. Generally it's good.
But I read the report and following parts seems not tested.

In parallel_apply_start_worker():

```
if (tmp_winfo->error_mq_handle == NULL)
{
/*
 * Release the worker information and try next one if 
the parallel
 * apply worker exited cleanly.
 */
ParallelApplyWorkersList = 
foreach_delete_current(ParallelApplyWorkersList, lc);
shm_mq_detach(tmp_winfo->mq_handle);
dsm_detach(tmp_winfo->dsm_seg);
pfree(tmp_winfo);
}
```

In HandleParallelApplyMessage():

```
case 'X':   /* Terminate, 
indicating clean exit */
{
shm_mq_detach(winfo->error_mq_handle);
winfo->error_mq_handle = NULL;
break;
}
```

Does it mean that we do not test the termination of parallel apply worker? If 
so I think it should be tested.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-09-12 Thread kuroda.hay...@fujitsu.com
Dear Hou-san,

Thank you for updating the patch! Followings are comments for v28-0001.
I will dig your patch more, but I send partially to keep the activity of the 
thread.

===
For applyparallelworker.c

01. filename
The word-ordering of filename seems not good
because you defined the new worker as "parallel apply worker".

02. global variable

```
+/* Parallel apply workers hash table (initialized on first use). */
+static HTAB *ParallelApplyWorkersHash = NULL;
+
+/*
+ * List that stores the information of parallel apply workers that were
+ * started. Newly added worker information will be removed from the list at the
+ * end of the transaction when there are enough workers in the pool. Besides,
+ * exited workers will be removed from the list after being detected.
+ */
+static List *ParallelApplyWorkersList = NIL;
```

Could you add descriptions about difference between the list and hash table?
IIUC the Hash stores the parallel workers that
are assigned to transacitons, and the list stores all alive ones.


03. parallel_apply_find_worker

```
+   /* Return the cached parallel apply worker if valid. */
+   if (stream_apply_worker != NULL)
+   return stream_apply_worker;
```

This is just a question -
Why the given xid and the assigned xid to the worker are not checked here?
Is there chance to find wrong worker? 


04. parallel_apply_start_worker

```
+/*
+ * Start a parallel apply worker that will be used for the specified xid.
+ *
+ * If a parallel apply worker is not in use then re-use it, otherwise start a
+ * fresh one. Cache the worker information in ParallelApplyWorkersHash keyed by
+ * the specified xid.
+ */
+void
+parallel_apply_start_worker(TransactionId xid)
```

"parallel_apply_start_worker" should be "start_parallel_apply_worker", I think


05. parallel_apply_stream_abort

```
for (i = list_length(subxactlist) - 1; i >= 0; i--)
{
xid = list_nth_xid(subxactlist, i);
if (xid == subxid)
{
found = true;
break;
}
}
```

Please not reuse the xid, declare and use another variable in the else block or 
something.

06. parallel_apply_free_worker

```
+   if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+   {
```

Please add a comment like: "Do we have enough workers in the pool?" or 
something.

===
For worker.c

07. general

In many lines if-else statement is used for apply_action, but I think they 
should rewrite as switch-case statement.

08. global variable

```
-static bool in_streamed_transaction = false;
+bool in_streamed_transaction = false;
```

a.

It seems that in_streamed_transaction is used only in the worker.c, so we can 
change to stati variable.

b.

That flag is set only when an apply worker spill the transaction to the disk.
How about "in_streamed_transaction" -> "in_spilled_transaction"?

09.  apply_handle_stream_prepare

```
-   elog(DEBUG1, "received prepare for streamed transaction %u", 
prepare_data.xid);
```

I think this debug message is still useful.

10. apply_handle_stream_stop

```
+   if (apply_action == TA_APPLY_IN_PARALLEL_WORKER)
+   {
+   pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
+   }
+   else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+   {
```

The ordering of the STREAM {STOP, START} is checked only when an apply worker 
spill the transaction to the disk.
(This is done via in_streamed_transaction)
I think checks should be added here, like if (!stream_apply_worker) or 
something.

11. apply_handle_stream_abort

```
+   if (in_streamed_transaction)
+   ereport(ERROR,
+   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+errmsg_internal("STREAM ABORT message without 
STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

12. apply_handle_stream_commit

a.

```
if (in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
 errmsg_internal("STREAM COMMIT message without 
STREAM STOP")));
```

I think the check by stream_apply_worker should be added.

b. 

```
-   elog(DEBUG1, "received commit for streamed transaction %u", xid);
```

I think this debug message is still useful.

===
For launcher.c

13. logicalrep_worker_stop_by_slot

```
+   LogicalRepWorker *worker = &LogicalRepCtx->workers[slot_no];
+
+   LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+   /* Return if the generation doesn't match or the worker is not alive. */
+   if (worker->generation != generation ||
+   worker->proc == NULL)
+   return;
+
```

a.

LWLockAcquire(LogicalRepWorkerLock) is needed before reading slots.

b. 

LWLockRelease(LogicalRepWorker

RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-06 Thread kuroda.hay...@fujitsu.com
Dear Amit,

Thanks for giving comments!

> Did you get this new assertion failure after you applied the patch for
> the first failure? Because otherwise, how can you reach it with the
> same test case?

The first failure is occurred only in the HEAD, so I did not applied the first 
patch
to REL14 and REL15.
This difference is caused because the commit [Fix catalog lookup...] in 
REL15(272248a) and older is different
from the HEAD one.
In order versions SnapBuildXidSetCatalogChanges() has been added. In the 
function
a transaction will be marked as containing catalog changes if the transaction 
is in InitialRunningXacts,
and after that the relation between sub-top transactions is assigned based on 
the parsed->subxact.
The marking avoids the first failure, but the assignment triggers new failure.


> About patch:
> else if (sub_needs_timetravel)
>   {
> - /* track toplevel txn as well, subxact alone isn't meaningful */
> + elog(DEBUG2, "forced transaction %u to do timetravel due to one of
> its subtransaction",
> + xid);
> + needs_timetravel = true;
>   SnapBuildAddCommittedTxn(builder, xid);
> 
> Why did you remove the above comment? I think it still makes sense to retain 
> it.

Fixed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



HEAD-v2-0001-Fix-assertion-failure-during-logical-decoding.patch
Description:  HEAD-v2-0001-Fix-assertion-failure-during-logical-decoding.patch


RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-06 Thread kuroda.hay...@fujitsu.com
> I was not sure what's the proper way to fix it.
> The solution I've thought at first was transporting all invalidations from 
> sub to top
> like ReorderBufferTransferSnapToParent(),
> but I do not know its side effect. Moreover, how do we deal with
> ReorderBufferChange?
> Should we transfer them too? If so, how about the ordering of changes?
> Alternative solustion was just remove the assertion, but was it OK?

PSA the PoC patch for discussion. In this patch only invalidation messages are 
transported,
changes hold by subtxn are ignored. This can be passed the reported workload.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



REL14-0001-PoC-Fix-assertion-failure-during-logical-decoding.patch
Description:  REL14-0001-PoC-Fix-assertion-failure-during-logical-decoding.patch


RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-06 Thread kuroda.hay...@fujitsu.com
Dear hackers,

> I agreed both that DEBUG2 messages are still useful but we should not
> change the condition for output. So I prefer the idea suggested by Amit.

PSA newer patch, which contains the fix and test.

> > I have not verified but I think we need to backpatch this till 14
> > because prior to that in DecodeCommit, we use to set the top-level txn
> > as having catalog changes based on the if there are invalidation
> > messages in the commit record. So, in the current scenario shared by
> > Osumi-San, before SnapBuildCommitTxn(), the top-level txn will be
> > marked as having catalog changes.
> 
> I and Osumi-san are now investigating that, so please wait further reports and
> patches.

We investigated it about older versions, and in some versions *another 
stack-trace* has been found.


About PG10-13, indeed, the failure was not occurred.
In these versions transactions are regarded as
that have catalog changes when the commit record has XACT_XINFO_HAS_INVALS flag.
This flag will be set if the transaction has invalidation messages.

When sub transaction changes system catalogs and user commits,
all invalidation messages allocated in sub transaction will be transferred to 
top transaction.
Therefore both transactions will be marked as containing catalog changes.


About PG14 and 15, however, another stack-trace has been found.
While executing the same workload, we got followings at the same SQL statement;

```
(gdb) backtrace
#0  0x7fa78c6dc387 in raise () from /lib64/libc.so.6
#1  0x7fa78c6dda78 in abort () from /lib64/libc.so.6
#2  0x00b16680 in ExceptionalCondition (conditionName=0xcd3ab0 
"txn->ninvalidations == 0", errorType=0xcd3284 "FailedAssertion", 
fileName=0xcd32d0 "reorderbuffer.c", lineNumber=2936) at assert.c:69
#3  0x008e9e70 in ReorderBufferForget (rb=0x12b5b10, xid=735, 
lsn=24125384) at reorderbuffer.c:2936
#4  0x008d9493 in DecodeCommit (ctx=0x12a2d20, buf=0x7ffe08b236b0, 
parsed=0x7ffe08b23510, xid=734, two_phase=false) at decode.c:733
#5  0x008d8962 in DecodeXactOp (ctx=0x12a2d20, buf=0x7ffe08b236b0) at 
decode.c:279
#6  0x008d85e2 in LogicalDecodingProcessRecord (ctx=0x12a2d20, 
record=0x12a30e0) at decode.c:142
#7  0x008dfef2 in pg_logical_slot_get_changes_guts (fcinfo=0x129acb0, 
confirm=true, binary=false) at logicalfuncs.c:296
#8  0x008e002f in pg_logical_slot_get_changes (fcinfo=0x129acb0) at 
logicalfuncs.c:365
...
(gdb) frame 4
#4  0x008d9493 in DecodeCommit (ctx=0x14cfd20, buf=0x7ffc638b0ca0, 
parsed=0x7ffc638b0b00, xid=734, two_phase=false) at decode.c:733
733 ReorderBufferForget(ctx->reorder, 
parsed->subxacts[i], buf->origptr);
(gdb) list
728  */
729 if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
730 {
731 for (i = 0; i < parsed->nsubxacts; i++)
732 {
733 ReorderBufferForget(ctx->reorder, 
parsed->subxacts[i], buf->origptr);
734 }
735 ReorderBufferForget(ctx->reorder, xid, buf->origptr);
736 
737 return;
(gdb) frame 3
#3  0x008e9e70 in ReorderBufferForget (rb=0x14e2b10, xid=735, 
lsn=24125152) at reorderbuffer.c:2936
2936Assert(txn->ninvalidations == 0);
(gdb) list
2931 */
2932if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2933ReorderBufferImmediateInvalidation(rb, 
txn->ninvalidations,
2934
   txn->invalidations);
2935else
2936Assert(txn->ninvalidations == 0);
2937 
2938/* remove potential on-disk data, and deallocate */
2939ReorderBufferCleanupTXN(rb, txn);
2940}
(gdb) print *txn
$1 = {txn_flags = 3, xid = 735, toplevel_xid = 734, gid = 0x0, first_lsn = 
24113488, final_lsn = 24125152, end_lsn = 0, toptxn = 0x14ecb98, 
  restart_decoding_lsn = 24113304, origin_id = 0, origin_lsn = 0, commit_time = 
0, base_snapshot = 0x0, base_snapshot_lsn = 0, 
  base_snapshot_node = {prev = 0x14ecc00, next = 0x14e2b28}, snapshot_now = 
0x0, command_id = 4294967295, nentries = 5, nentries_mem = 5, 
  changes = {head = {prev = 0x14eecf8, next = 0x14eeb18}}, tuplecids = {head = 
{prev = 0x14ecb10, next = 0x14ecb10}}, ntuplecids = 0, 
  tuplecid_hash = 0x0, toast_hash = 0x0, subtxns = {head = {prev = 0x14ecb38, 
next = 0x14ecb38}}, nsubtxns = 0, ninvalidations = 3, 
  invalidations = 0x14e2d28, node = {prev = 0x14ecc68, next = 0x14ecc68}, size 
= 452, total_size = 452, concurrent_abort = false, 
  output_plugin_private = 0x0}
```

In these versions DecodeCommit() said OK. However, we have met another failure
because the ReorderBufferTXN of the sub transaction had invalidation messages 
but it did not have base_snapshot.

I thought that this failure was occurred the only the base_snapsh

RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-02 Thread kuroda.hay...@fujitsu.com
> > I'm basically fine, too. But this is a bug that needs back-patching
> > back to 10.
> >
> 
> I have not verified but I think we need to backpatch this till 14
> because prior to that in DecodeCommit, we use to set the top-level txn
> as having catalog changes based on the if there are invalidation
> messages in the commit record. So, in the current scenario shared by
> Osumi-San, before SnapBuildCommitTxn(), the top-level txn will be
> marked as having catalog changes.

I and Osumi-san are now investigating that, so please wait further reports and 
patches.

> > This change changes the condition for the DEBUG2 message.
> > So we need to add an awkward if() condition for the DEBUG2 message.
> > Looking that the messages have different debug-level, I doubt there
> > have been a chance they are useful.  If we remove the two DEBUGx
> > messages, I'm fine with the change.
> >
> 
> I think these DEBUG2 messages could be useful, so instead of removing
> these, I suggest we should follow Dilip's proposed fix and maybe add a
> new DEBUG2 message on the lines of (("forced transaction %u to do
> timetravel due to one of its subtransaction", xid) in the else if
> (sub_needs_timetravel) condition if we think that will be useful too
> but I am fine leaving the addition of new DEBUG2 message.

I agreed both that DEBUG2 messages are still useful but we should not
change the condition for output. So I prefer the idea suggested by Amit.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-01 Thread kuroda.hay...@fujitsu.com
Dear Horiguchi-san, Dilip,

Thank you for replying!

> > It seems that SnapBuildCommitTxn() is already taking care of adding
> > the top transaction to the committed transaction if any subtransaction
> > has the catalog changes, it has just missed setting the flag so I
> > think just setting the flag like this should be sufficient no?
> 
> Oops! That's right.

Basically I agreed, but I was not sure the message "found top level 
transaction..."
should be output or not. It may be useful even if one of sub transactions 
contains the change.

How about following?

diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index bf72ad45ec..a630522907 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1086,8 +1086,17 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
}
}
 
-   /* if top-level modified catalog, it'll need a snapshot */
-   if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
+   /*
+* if top-level or one of sub modified catalog, it'll need a snapshot.
+*
+* Normally the second check is not needed because the relation between
+* top-sub transactions is tracked on the ReorderBuffer layer, and the 
top
+* transaction is marked as containing catalog changes if its children 
are.
+* But in some cases the relation may be missed, in which case only the 
sub
+* transaction may be marked as containing catalog changes.
+*/
+   if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)
+   || sub_needs_timetravel)
{
elog(DEBUG2, "found top level transaction %u, with catalog 
changes",
 xid);
@@ -1095,11 +1104,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, 
TransactionId xid,
needs_timetravel = true;
SnapBuildAddCommittedTxn(builder, xid);
}
-   else if (sub_needs_timetravel)
-   {
-   /* track toplevel txn as well, subxact alone isn't meaningful */
-   SnapBuildAddCommittedTxn(builder, xid);
-   }
else if (needs_timetravel)
{
elog(DEBUG2, "forced transaction %u to do timetravel", xid);

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: test_decoding assertion failure for the loss of top-sub transaction relationship

2022-09-01 Thread kuroda.hay...@fujitsu.com
Hi Hackers,

> Therefore, this leads to the failure for the assert that can check
> the consistency that when one sub transaction modifies the catalog,
> its top transaction should be marked so as well.
> 
> I feel we need to remember the relationship between top transaction and sub
> transaction
> in the serialized snapshot even before changing catalog at decoding
> RUNNING_XACT,
> so that we can keep track of the association after the restart. What do you 
> think ?

PSA patch that fixes the failure.
This adds pairs of sub-top transactions to the SnapBuild, and it will be 
serialized and restored.
The pair will be checked when we mark the ReorderBufferTXN as 
RBTXN_HAS_CATALOG_CHANGES.

Thanks to off-list discussion with Osumi-san.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



0001-mark-RBTXN_HAS_CATALOG_CHANGES-to-the-top-transactio.patch
Description:  0001-mark-RBTXN_HAS_CATALOG_CHANGES-to-the-top-transactio.patch


RE: patch: Add missing descriptions for rmgr APIs

2022-08-29 Thread kuroda.hay...@fujitsu.com
> Your observation seems correct to me but you have not updated the
> comment for the mask. Is there a reason for the same?

Oh, it seems that I attached wrong one. There is no reason.
PSA the newer version.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v2-0001-add-a-missing-comment.patch
Description: v2-0001-add-a-missing-comment.patch


patch: Add missing descriptions for rmgr APIs

2022-08-28 Thread kuroda.hay...@fujitsu.com
Hi hackers,

While reading codes related with logical decoding,
I thought that following comment in rmgrlist.h is not consistent.

> /* symbol name, textual name, redo, desc, identify, startup, cleanup */

This comment describes a set of APIs that the resource manager should have, but 
functions for {mask, decode} are missed here.

Did we have any reasons for that? I thought it might be not friendly, so I 
attached a patch.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



0001-add-a-missing-comment.patch
Description: 0001-add-a-missing-comment.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2022-08-23 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Followings are my comments about v23-0003. Currently I do not have any comments 
about 0002 and 0004.

09. general

It seems that logicalrep_rel_mark_parallel_apply() is always called when 
relations are opened on the subscriber-side,
but is it really needed? There checks are required only for streaming parallel 
apply,
so it may be not needed in case of streaming = 'on' or 'off'.

10. commit message

2) There cannot be any non-immutable functions used by the subscriber-side
replicated table. Look for functions in the following places:
* a. Trigger functions
* b. Column default value expressions and domain constraints
* c. Constraint expressions
* d. Foreign keys

"Foreign key" should not be listed here because it is not related with the 
mutability. I think it should be listed as 3), not d..

11. create_subscription.sgml

The constraint about foreign key should be described here.

11. relation.c

11.a

+   CacheRegisterSyscacheCallback(PROCOID,
+ 
logicalrep_relmap_reset_parallel_cb,
+ (Datum) 0);

Isn't it needed another syscache callback for pg_type?
Users can add any constraints via ALTER DOMAIN command, but the added 
constraint may be not checked.
I checked AlterDomainAddConstraint(), and it invalidates only the relcache for 
pg_type.

11.b

+   /*
+* If the column is of a DOMAIN type, determine whether
+* that domain has any CHECK expressions that are not
+* immutable.
+*/
+   if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN)
+   {

I think the default value of *domain* must be also checked here.
I tested like followings.

===
1. created a domain that has a default value
CREATE DOMAIN tmp INT DEFAULT 1 CHECK (VALUE > 0);

2. created a table 
CREATE TABLE foo (id tmp PRIMARY KEY);

3. checked pg_attribute and pg_class
select oid, relname, attname, atthasdef from pg_attribute, pg_class where 
pg_attribute.attrelid = pg_class.oid and pg_class.relname = 'foo' and attname = 
'id';
  oid  | relname | attname | atthasdef 
---+-+-+---
 16394 | foo | id  | f
(1 row)

Tt meant that functions might be not checked because the if-statement `if 
(att->atthasdef)` became false.
===

12. 015_stream.pl, 016_stream_subxact.pl, 022_twophase_cascade.pl, 
023_twophase_stream.pl

-   my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
+   my ($node_publisher, $node_subscriber, $appname) = @_;

Why the parameter is removed? I think the test that waits the output
from the apply background worker is meaningful.

13. 032_streaming_apply.pl

The filename seems too general because apply background workers are tested in 
above tests.
How about "streaming_apply_constraint" or something?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-08-22 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thank you for updating the patch! Followings are comments about v23-0001 and 
v23-0005.

v23-0001

01. logical-replication.sgml

+  
+   When the streaming mode is parallel, the finish LSN of
+   failed transactions may not be logged. In that case, it may be necessary to
+   change the streaming mode to on and cause the same
+   conflicts again so the finish LSN of the failed transaction will be written
+   to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ...
+   SKIP.
+  

I was not sure about streaming='off' mode. Is there any reasons that only ON 
mode is focused?

02. protocol.sgml

+  
+   Int64 (XLogRecPtr)
+   
+
+ The LSN of the abort. This field is available since protocol version
+ 4.
+
+   
+  
+
+  
+   Int64 (TimestampTz)
+   
+
+ Abort timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01). This field is
+ available since protocol version 4.
+
+   
+  
+

It seems that changes are in the variablelist for stream commit.
I think these are included in the stream abort message, so it should be moved.

03. decode.c

-   ReorderBufferForget(ctx->reorder, parsed->subxacts[i], 
buf->origptr);
+   ReorderBufferForget(ctx->reorder, parsed->subxacts[i], 
buf->origptr,
+   commit_time);
}
-   ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+   ReorderBufferForget(ctx->reorder, xid, buf->origptr, 
commit_time);

'commit_time' has been passed as argument 'abort_time', I think it may be 
confusing.
How about adding a comment above, like:
"In case of streamed transactions, they are regarded as being aborted at 
commit_time"

04. launcher.c

04.a

+   worker->main_worker_pid = is_subworker ? MyProcPid : 0;

You can use InvalidPid instead of 0.
(I thought pid should be represented by the datatype pid_t, but in some codes 
it is defined as int...) 

04.b

+   worker->main_worker_pid = 0;

You can use InvalidPid instead of 0, same as above.

05. origin.c

 void
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, int acquired_by)

IIUC the same slot can be used only when the apply main worker has already 
acquired the slot
and the subworker for the same subscription tries to acquire, but it cannot 
understand from comments.
How about adding comments, or an assertion that acquired_by is same as 
session_replication_state->acquired_by ?
Moreover acquired_by should be compared with InvalidPid, based on above 
comments.

06. proto.c

 void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
- TransactionId subxid)
+ ReorderBufferTXN 
*txn, XLogRecPtr abort_lsn,
+ bool write_abort_lsn

I think write_abort_lsn may be not needed,
because abort_lsn can be used for controlling whether abort_XXX fields should 
be filled or not.

07. worker.c

+/*
+ * The number of changes during one streaming block (only for apply background
+ * workers)
+ */
+static uint32 nchanges = 0;

This variable is used only by the main apply worker, so the comment seems not 
correct.
How about "...(only for SUBSTREAM_PARALLEL case)"?

v23-0005

08. monitoring.sgml

I cannot decide which option proposed in [1] is better, but followings 
descriptions are needed in both cases.
(In [2] I had intended to propose something like option 2)

08.a

You can add a description that the field 'relid' will be NULL even for apply 
background worker.

08.b

You can add a description that fields 'received_lsn', 'last_msg_send_time', 
'last_msg_receipt_time',
'latest_end_lsn', 'latest_end_time' will be NULL for apply background worker.


[1]: 
https://www.postgresql.org/message-id/CAHut%2BPuPwdwZqXBJjtU%2BR9NULbOpxMG%3Di2hmqgg%2B7p0rmK0hrw%40mail.gmail.com
[2]: 
https://www.postgresql.org/message-id/TYAPR01MB58660B4732E7F80B322174A3F5629%40TYAPR01MB5866.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-08-09 Thread kuroda.hay...@fujitsu.com
Hi Wang,

> 6.a
> 
> It seems that the upper line represents the apply background worker, but I 
> think
> last_msg_send_time and last_msg_receipt_time should be null.
> Is it like initialization mistake?

I checked again about the issue.

Attributes worker->last_send_time, worker->last_recv_time, and 
worker->reply_time
are initialized in logicalrep_worker_launch():

```
...
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
...
```

And the macro is defined in timestamp.h, and it seems that the values are 
initialized as PG_INT64_MIN.

```
#define DT_NOBEGIN  PG_INT64_MIN
#define DT_NOENDPG_INT64_MAX

#define TIMESTAMP_NOBEGIN(j)\
do {(j) = DT_NOBEGIN;} while (0)
```


However, in pg_stat_get_subscription(), these values are regarded as null if 
they are zero.

```
if (worker.last_send_time == 0)
nulls[4] = true;
else
values[4] = TimestampTzGetDatum(worker.last_send_time);
if (worker.last_recv_time == 0)
nulls[5] = true;
else
values[5] = TimestampTzGetDatum(worker.last_recv_time);
```

I think above lines are wrong, these values should be compared with 
PG_INT64_MIN.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-08-09 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thanks for updating patch sets! Followings are comments about v20-0001.

1. config.sgml

```
   
Specifies maximum number of logical replication workers. This includes
both apply workers and table synchronization workers.
   
```

I think you can add a description in the above paragraph, like
" This includes apply main workers, apply background workers, and table 
synchronization workers."

2. logical-replication.sgml

2.a Configuration Settings

```
   max_logical_replication_workers must be set to at least
   the number of subscriptions, again plus some reserve for the table
   synchronization.
```

I think you can add a description in the above paragraph, like
"... the number of subscriptions, plus some reserve for the table 
synchronization
 and the streaming transaction."

2.b Monitoring

```
  
   Normally, there is a single apply process running for an enabled
   subscription.  A disabled subscription or a crashed subscription will have
   zero rows in this view.  If the initial data synchronization of any
   table is in progress, there will be additional workers for the tables
   being synchronized.
  
```

I think you can add a sentence in the above paragraph, like 
"... synchronized. Moreover, if the streaming transaction is applied parallelly,
there will be additional workers"

3. launcher.c

```
+   /* Sanity check : we don't support table sync in subworker. */
```

I think "Sanity check :" should be "Sanity check:", per other files.

4. worker.c

4.a handle_streamed_transaction()

```
-   /* not in streaming mode */
-   if (!in_streamed_transaction)
+   /* Not in streaming mode */
+   if (!(in_streamed_transaction || am_apply_bgworker()))
```

I think the comment should also mention about apply background worker case.

4.b handle_streamed_transaction()

```
-   Assert(stream_fd != NULL);
```

I think this assersion seems reasonable in case of stream='on'.
Could you revive it and move to later part in the function, like after 
subxact_info_add(current_xid)?

4.c apply_handle_prepare_internal()

```
 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
 * called within the PrepareTransactionBlock below.
 */
-   BeginTransactionBlock();
+   if (!IsTransactionBlock())
+   BeginTransactionBlock();
+
```

I think the comment should be "We must be in transaction block to balance...".

4.d apply_handle_stream_prepare()

```
- *
- * Logic is in two parts:
- * 1. Replay all the spooled operations
- * 2. Mark the transaction as prepared
  */
 static void
 apply_handle_stream_prepare(StringInfo s)
```

I think these comments are useful when stream='on',
so it should be moved to later part.

5. applybgworker.c

5.a apply_bgworker_setup()

```
+   elog(DEBUG1, "setting up apply worker #%u", 
list_length(ApplyBgworkersList) + 1); 
```

"apply worker" should be "apply background worker".

5.b LogicalApplyBgwLoop()

```
+   elog(DEBUG1, "[Apply BGW #%u] ended processing 
streaming chunk,"
+"waiting on shm_mq_receive", 
shared->worker_id);
```

A blank is needed after comma. I checked serverlog, and the message outputed 
like:

```
[Apply BGW #1] ended processing streaming chunk,waiting on shm_mq_receive
```

6.

When I started up the apply background worker and did `SELECT * from 
pg_stat_subscription`, I got following lines:

```
postgres=# select * from pg_stat_subscription;
 subid | subname |  pid  | relid | received_lsn |  last_msg_send_time   
| last_msg_receipt_time | latest_end_lsn |latest_end
_time
---+-+---+---+--+---+---++--
-
 16400 | sub | 22383 |   |  | -infinity 
| -infinity || -infinity
 16400 | sub | 22312 |   | 0/6734740| 2022-08-09 07:40:19.367676+00 
| 2022-08-09 07:40:19.375455+00 | 0/6734740  | 2022-08-09 07:40:
19.367676+00
(2 rows)
```


6.a

It seems that the upper line represents the apply background worker, but I 
think last_msg_send_time and last_msg_receipt_time should be null.
Is it like initialization mistake?

```
$ ps aux | grep 22383
... postgres: logical replication apply background worker for subscription 16400
```

6.b

Currently, the documentation doesn't clarify the method to determine the type 
of logical replication workers.
Could you add descriptions about it?
I think adding a column "subworker" is an alternative approach.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Perform streaming logical transactions by background workers and parallel apply

2022-07-27 Thread kuroda.hay...@fujitsu.com
Dear Wang,

I found further comments about the test code.

11. src/test/regress/sql/subscription.sql

```
-- fail - streaming must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, streaming = foo);
```

The comment is no longer correct: should be "streaming must be boolean or 
'parallel'"

12. src/test/regress/sql/subscription.sql

```
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, streaming = true);
```

I think we should test the case of streaming = 'parallel'.

13. 015_stream.pl

I could not find test about TRUNCATE. IIUC apply bgworker works well
even if it gets LOGICAL_REP_MSG_TRUNCATE message from main worker.
Can you add the case? 

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Support load balancing in libpq

2022-07-27 Thread kuroda.hay...@fujitsu.com
Dear Jelte,

> With plain Postgres this assumption is probably correct. But the main reason
> I'm interested in this patch was because I would like to be able to load
> balance across the workers in a Citus cluster. These workers are all 
> primaries.
> Similar usage would likely be possible with BDR (bidirectional replication).

I agree this feature is useful for BDR-like solutions.

> If the user takes such care when building their host list, they could simply 
> not add the load_balance_hosts=true option to the connection string.
> If you know there's only one primary in the list and you're looking for
> the primary, then there's no reason to use load_balance_hosts=true.

You meant that it was the user responsibility to set correctly, right?
It seemed reasonable because libpq was just a library for connecting to server
and should not be smart.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Perform streaming logical transactions by background workers and parallel apply

2022-07-27 Thread kuroda.hay...@fujitsu.com
Dear Wang-san,

Hi, I'm also interested in the patch and I started to review this.
Followings are comments about 0001.

1. terminology

In your patch a new worker "apply background worker" has been introduced,
but I thought it might be confused because PostgreSQL has already the worker 
"background worker".
Both of apply worker and apply bworker are categolized as bgworker. 
Do you have any reasons not to use "apply parallel worker" or "apply streaming 
worker"?
(Note that I'm not native English speaker)

2. logicalrep_worker_stop()

```
-   /* No worker, nothing to do. */
-   if (!worker)
-   {
-   LWLockRelease(LogicalRepWorkerLock);
-   return;
-   }
+   if (worker)
+   logicalrep_worker_stop_internal(worker);
+
+   LWLockRelease(LogicalRepWorkerLock);
+}
```

I thought you could add a comment the meaning of if-statement, like "No main 
apply worker, nothing to do"

3. logicalrep_workers_find()

I thought you could add a description about difference between this and 
logicalrep_worker_find() at the top of the function.
IIUC logicalrep_workers_find() counts subworker, but logicalrep_worker_find() 
does not focus such type of workers.

4. logicalrep_worker_detach()

```
static void
 logicalrep_worker_detach(void)
 {
+   /*
+* If we are the main apply worker, stop all the apply background 
workers
+* we started before.
+*
```

I thought "we are" should be "This is", based on other comments.

5. applybgworker.c

```
+/* Apply background workers hash table (initialized on first use) */
+static HTAB *ApplyWorkersHash = NULL;
+static List *ApplyWorkersFreeList = NIL;
+static List *ApplyWorkersList = NIL;
```

I thought they should be ApplyBgWorkersXXX, because they stores information 
only related with apply bgworkers.

6. ApplyBgworkerShared

```
+   TransactionId   stream_xid;
+   uint32  n;  /* id of apply background worker */
+} ApplyBgworkerShared;
```

I thought the field "n" is too general, how about "proc_id" or "worker_id"?

7. apply_bgworker_wait_for()

```
+   /* If any workers (or the postmaster) have died, we have 
failed. */
+   if (status == APPLY_BGWORKER_EXIT)
+   ereport(ERROR,
+   
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+errmsg("background worker %u failed to 
apply transaction %u",
+   wstate->shared->n, 
wstate->shared->stream_xid)))
```

7.a
I thought we should not mention about PM death here, because in this case
apply worker will exit at WaitLatch().  

7.b
The error message should be "apply background worker %u...".

8. apply_bgworker_check_status()

```
+errmsg("background worker %u exited 
unexpectedly",
+   wstate->shared->n)));
```

The error message should be "apply background worker %u...".


9. apply_bgworker_send_data()

```
+   if (result != SHM_MQ_SUCCESS)
+   ereport(ERROR,
+   
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+errmsg("could not send tuples to shared-memory 
queue")));
```

I thought the error message should be "could not send data to..."
because sent data might not be tuples. For example, in case of STEAM PREPARE, I 
thit does not contain tuple.

10. wait_event.h

```
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
+   WAIT_EVENT_LOGICAL_APPLY_WORKER_STATE_CHANGE,
WAIT_EVENT_LOGICAL_SYNC_DATA,
```

I thought the event should be WAIT_EVENT_LOGICAL_APPLY_BG_WORKER_STATE_CHANGE,
because this is used when apply worker waits until the status of bgworker 
changes.  


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Collect ObjectAddress for ATTACH DETACH PARTITION to use in event trigger

2022-07-15 Thread kuroda.hay...@fujitsu.com
Dear Hou-san,

> Thanks for having a look. It was a bit difficult to add a test for this.
> Because we currently don't have a user function which can return these
> collected ObjectAddresses for ALTER TABLE. And It seems we don't have tests 
> for
> already collected ObjectAddresses as well :(
> The collected ObjectAddresses is in
> "currentEventTriggerState->currentCommand->d.alterTable.subcmds.address" while
> the public function pg_event_trigger_ddl_commands doesn't return these
> information. It can only be used in user defined event trigger function (C
> code).

Thanks for explaining. I did not know the reason why the test is not in 
event_trigger.sql.

> If we want to add some tests for both already existed and newly added
> ObjectAddresses, we might need to add some test codes in test_ddl_deparse.c.
> What do you think ?

I thought tests for ObjectAddresses should be added to test_ddl_deparse.c, but
it might be bigger because there were many ATExecXXX() functions.
I thought they could be added separately in another thread or patch.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Support load balancing in libpq

2022-07-14 Thread kuroda.hay...@fujitsu.com
Dear Jelte,

I like your idea. But do we have to sort randomly even if target_session_attr 
is set to 'primary' or 'read-write'?

I think this parameter can be used when all listed servers have same data,
and we can assume that one of members is a primary and others are secondary.

In this case user maybe add a primary host to top of the list,
so sorting may increase time durations for establishing connection.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Collect ObjectAddress for ATTACH DETACH PARTITION to use in event trigger

2022-07-14 Thread kuroda.hay...@fujitsu.com
Hi,

> > I noticed that we didn't collect the ObjectAddress returned by
> > ATExec[Attach|Detach]Partition. I think collecting this information can 
> > make it
> > easier for users to get the partition OID of the attached or detached table 
> > in
> > the event trigger. So how about collecting it like the attached patch ?
> 
> Added to next CF.

Sounds good. I grepped ATExecXXX() functions called in ATExecCmd(),
and I confirmed that all returned values have been collected except them.

While checking test code test about EVENT TRIGGER,
I found there were no tests related with partitions in that.
How about adding them?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: Multi-Master Logical Replication

2022-06-14 Thread kuroda.hay...@fujitsu.com
Dear Takahashi-san,

Thanks for giving feedbacks!

> > I don't know if it requires the kind of code you are thinking but I
> > agree that it is worth considering implementing it as an extension.
> 
> I think the other advantage to implement as an extension is that users could
> install the extension to older Postgres.
> 
> As mentioned in previous email, the one use case of n-way replication is 
> migration
> from older Postgres to newer Postgres.
> 
> If we implement as an extension, users could use n-way replication for 
> migration
> from PG10 to PG16.
>

I think even if LRG is implemented as contrib modules or any extensions,
it will deeply depend on the subscription option "origin" proposed in [1].
So LRG cannot be used for older version, only PG16 or later.

[1]: 
https://www.postgresql.org/message-id/caldanm3pt1cpeb3y9pe7ff91gzvpnxr91y4ztwiw6h+gayg...@mail.gmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Multi-Master Logical Replication

2022-06-06 Thread kuroda.hay...@fujitsu.com
Dear hackers,

I found another use-case for LRG. It might be helpful for migration.


LRG for migration
--
LRG may be helpful for machine migration, OS upgrade,
or PostgreSQL itself upgrade.

Assumes that users want to migrate database to other environment,
e.g., PG16 on RHEL7 to PG18 on RHEL8.
Users must copy all data into new server and catchup all changes.
In this case streaming replication cannot be used
because it requires same OS and same PostgreSQL major version.
Moreover, it is desirable to be able to return to the original environment at 
any time
in case of application or other environmental deficiencies.


Operation steps with LRG
--

LRG is appropriate for the situation. Following lines are the workflow that 
users must do:

1. Copy the table definition to the newer node(PG18), via pg_dump/pg_restore
2. Execute lrg_create() in the older node(PG16)
3. Execute lrg_node_attach() in PG18

=== data will be shared here===

4. Change the connection of the user application to PG18
5. Check whether ERROR is raised or not. If some ERRORs are raised,
  users can change back the connection to PG16.
6. Remove the created node group if application works well.

These operations may reduce system downtime
due to incompatibilities associated with version upgrades.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Multi-Master Logical Replication

2022-05-18 Thread kuroda.hay...@fujitsu.com
Hi hackers,

I created a small PoC. Please see the attached patches.

REQUIREMENT

Before patching them, patches in [1] must also be applied.


DIFFERENCES FROM PREVIOUS DESCRIPTIONS

* LRG is now implemented as SQL functions, not as a contrib module.
* New tables are added as system catalogs. Therefore, added tables have oid 
column.
* The node_id is the strcat of system identifier and dbid.


HOW TO USE

In the document patch, a subsection 'Example' was added for understanding LRG. 
In short, we can do

1. lrg_create on one node
2. lrg_node_attach on another node

Also attached is a test script that constructs a three-nodes system.


LIMITATIONS

This feature is under development, so there are many limitations for use case.

* The function for detaching a node from a group is not implemented.
* The function for removing a group is not implemented.
* LRG does not lock system catalogs and databases. Concurrent operations may 
cause inconsistent state.
* LRG does not wait until the upstream node reaches the latest lsn of the 
remaining nodes.
* LRG does not support initial data sync. That is, it can work well only when 
all nodes do not have initial data.


[1]: https://commitfest.postgresql.org/38/3610/

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v1-0001-PoC-implement-LRG.patch
Description: v1-0001-PoC-implement-LRG.patch


v1-0002-add-doc.patch
Description: v1-0002-add-doc.patch


test.sh
Description: test.sh


RE: Multi-Master Logical Replication

2022-04-28 Thread kuroda.hay...@fujitsu.com
Dear Laurenz,

Thank you for your interest in our works!

> I am missing a discussion how replication conflicts are handled to
> prevent replication from breaking or the databases from drifting apart.

Actually we don't have plans for developing the feature that avoids conflict.
We think that it should be done as core PUB/SUB feature, and
this module will just use that.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-04-06 Thread kuroda.hay...@fujitsu.com
Dear Peter,

> FYI, here is a test script that is using the current patch (v6) to
> demonstrate a way to share table data between different numbers of
> nodes (up to 5 of them here).

Thanks for sharing your script! It's very helpful for us.

While reading your script, however, I had a question about it.
Line 121-122, you defined subscriptions for 2-nodes cluster:

psql -p $port_N1 -c "create subscription sub12 connection 'port=$port_N2' 
publication pub2 with ($copy_force);"
psql -p $port_N2 -c "create subscription sub21 connection 'port=$port_N1' 
publication pub1 with ($copy_force);"

But I was not sure it works well.
N2 already have shared data from N1 when subscription sub21 is created.
Did you assume that the initial copying is not so quick and
data synchronization will be not done when creating sub21?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-03-28 Thread kuroda.hay...@fujitsu.com
Dear Amit, Wang,

> > I think maybe we do not need to deal with this use case.
> > The maximum number of table columns allowed by PG is 1600
> > (macro MaxHeapAttributeNumber), and after loop through all columns in the
> > function logicalrep_write_tuple, the function OutputPluginWrite will be 
> > invoked
> > immediately to actually send the data to the subscriber. This refreshes the
> > last time the subscriber received a message.
> > So I think this loop will not cause timeout issues.
> >
> 
> Right, I also don't think it can be a source of timeout.

OK. I have no comments for this version.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED


RE: Logical replication timeout problem

2022-03-27 Thread kuroda.hay...@fujitsu.com
Dear Wang-san,

Thank you for updating!
...but it also cannot be applied to current HEAD
because of the commit 923def9a533.

Your patch seems to conflict the adding an argument of 
logicalrep_write_insert().
It allows specifying columns to publish by skipping some columns in 
logicalrep_write_tuple()
which is called from logicalrep_write_insert() and logicalrep_write_update().

Do we have to consider something special case for that?
I thought timeout may occur if users have huge table and publish few columns,
but it is corner case.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-03-24 Thread kuroda.hay...@fujitsu.com
Dear Amit,

> It seems by mistake you have removed the changes from pgoutput_message
> and pgoutput_truncate functions. I have added those back.
> Additionally, I made a few other changes: (a) moved the function
> UpdateProgress to pgoutput.c as it is not used outside it, (b) change
> the new parameter in plugin API from 'send_keep_alive' to 'last_write'
> to make it look similar to WalSndPrepareWrite and WalSndWriteData, (c)
> made a number of changes in WalSndUpdateProgress API, it is better to
> move keep-alive code after lag track code because we do process
> replies at that time and there it will compute the lag; (d)
> changed/added comments in the code.

LGTM, but the patch cannot be applied to current HEAD.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-03-14 Thread kuroda.hay...@fujitsu.com
Dear Vignesh,

> Thanks for kind explanation.
> I read above and your doc in 0002, and I put some comments.

I forgot a comment about 0002 doc.

5. create_subscription.sgml - about your example

Three possibilities were listed in the doc,
but I was not sure about b) case.
In the situation Node1 and Node2 have already become multi-master,
and data has already synced at that time.
If so, how do we realize that "there is data present only in one Node"?
Case a) and c) seem reasonable.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-03-14 Thread kuroda.hay...@fujitsu.com
Dear Vignesh,

Thank you for updating your patch!

> Let's consider an existing Multi master logical replication setup
> between Node1 and Node2 that is created using the following steps:
> a) Node1 - Publication publishing employee table - pub1
> b) Node2 - Subscription subscribing from publication pub1 with
> publish_local_only - sub1_pub1_node1
> c) Node2 - Publication publishing employee table - pub2
> d) Node1 - Subscription subscribing from publication pub2 with
> publish_local_only - sub2_pub2_node2
> 
> To create a subscription in node3, we will be using the following steps:
> a) Node2 - Publication publishing employee table. - pub3
> b) Node3 - Subscription subscribing from publication in Node2 with
> publish_local_only - sub3_pub3_node2
> 
> When we create a subscription in Node3, Node3 will connect to
> Node2(this will not be done in Node3) and check if the employee table
> is present in pg_subscription_rel, in our case Node2 will have
> employee table present in pg_subscription_rel (sub1_pub1_node1
> subscribing to employee table from pub1 in Node1). As employee table
> is being subscribed in node2 from node1, we will throw an error like
> below:
> postgres=# create subscription sub2 CONNECTION 'dbname =postgres port
> = ' publication pub2 with (publish_local_only=on);
> ERROR:  CREATE/ALTER SUBSCRIPTION with publish_local_only and
> copy_data as true is not allowed when the publisher might have
> replicated data, table:public.t1 might have replicated data in the
> publisher
> HINT:  Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force

Thanks for kind explanation. 
I read above and your doc in 0002, and I put some comments.

1. alter_subscription.sgml

```
-copy_data (boolean)
+copy_data (boolean | 
force)
```

I thought that it should be written as enum. For example, huge_pages GUC 
parameter
can accept {on, off, try}, and it has been written as enum.

2. create_subscription.sgml

```
-copy_data (boolean)
+copy_data (boolean | 
force)
```

Same as above.

3. create_subscription.sgml

```
+
+ 
+  If the publication tables were also subscribing data in the publisher
+  from other publishers, it will affect the
+  CREATE SUBSCRIPTION based on the value specified
+  for publish_local_only option. Refer to the
+   for details.
+ 
```

I seeked docs, but the words " publication tables " have not seen.
How about "tables in the publication"?

4. create_subscription.sgml - about your example 

In the first section, we should describe about 2-nodes case more detail
like Amit mentioned in [1]. I thought that Option-3 can be resolved by defining
subscriptions in both nodes with publish_local_only = true and copy_data = 
force.

> I was initially planning to add srreplicateddata field but I have
> changed it slightly to keep the design simple. Now we just check if
> the relation is present in pg_subscription_rel and throw an error if
> copy_data and publish_local_only option is specified. The changes for
> the same are available at [1].
> 
> [1] -
> https://www.postgresql.org/message-id/CALDaNm0V%2B%3Db%3DCeZJNAAU
> O2PmSXH5QzNX3jADXb-0hGO_jVj0vA%40mail.gmail.com
> Thoughts?

Actually I doubted that the column is really needed, so it is good.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-03-11 Thread kuroda.hay...@fujitsu.com
Hi Vegnesh,

While considering about second problem, I was very confusing about it.
I'm happy if you answer my question.

> To handle this if user has specified only_local option, we could throw
> a warning or error out while creating subscription in this case, we
> could have a column srreplicateddata in pg_subscription_rel which
> could indicate if the table has any replicated data or not:
> postgres=# select * from pg_subscription_rel;
>  srsubid | srrelid | srsubstate | srsublsn  | srreplicateddata
> -+-++---+--
>16389 |   16384 | r  | 0/14A4640 |t
>16389 |   16385 | r  | 0/14A4690 |f
> (1 row)
> In the above example, srreplicateddata with true indicates, tabel t1
> whose relid is 16384 has replicated data and the other row having
> srreplicateddata  as false indicates table t2 whose relid is 16385
> does not have replicated data.
> When creating a new subscription, the subscriber will connect to the
> publisher and check if the relation has replicated data by checking
> srreplicateddata in pg_subscription_rel table.
> If the table has any replicated data, log a warning or error for this.

IIUC srreplicateddata represents whether the subscribed data is not
generated from the publisher, but another node.
My first impression was that the name 'srreplicateddata' is not friendly
because all subscribed data is replicated from publisher.
Also I was not sure how value of the column was set.
IIUC a filtering by replication origins is done in publisher node
and subscriber node cannot know
whether some data are really filtered or not.
If we distinguish by subscriber option publish_local_only,
it cannot reproduce your example because same subscriber have different 
'srreplicateddata'.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-03-08 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thank you for updating!

> > Do we need adding a test for them? I think it can be added to 100_bugs.pl.
> > Actually I tried to send PoC, but it does not finish to implement that.
> > I'll send if it is done.
> I'm not sure if it is worth it.
> Because the reproduced test of this bug might take some time and might risk
> making the build farm slow, so I am not sure if others would like the
> reproduced test of this bug.

I was taught from you that it may suggest that it is difficult to stabilize and
minimize the test. I withdraw the above.
I put some comments for v2, mainly cosmetic ones.

1. pgoutput_change
```
+   bool is_send = true;
```

My first impression is that is_send should be initialized to false,
and it will change to true when OutputPluginWrite() is called.


2. pgoutput_change
```
+   {
+   is_send = false;
+   break;
+   }
```

Here are too many indents, but I think they should be removed.
See above comment.

3. WalSndUpdateProgress
```
+   /*
+* If half of wal_sender_timeout has lapsed without send 
message standby,
+* send a keep-alive message to the standby.
+*/
```

The comment seems inconsistency with others.
Here is "keep-alive", but other parts are "keepalive".

4. ReorderBufferProcessTXN
```
+   
change->data.inval.ninvalidations,
+   
change->data.inval.invalidations);
```

Maybe these lines break 80-columns rule.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-03-08 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thank you for updating the patch! Good self-reviewing.

> And I looked into the function WalSndUpdateProgress. I found function
> WalSndUpdateProgress try to record the time of some message(by function
> LagTrackerWrite) sent to subscriber, such as in function pgoutput_commit_txn.

Yeah, I think you are right.

> Then, when publisher receives the reply message from the subscriber(function
> ProcessStandbyReplyMessage), publisher invokes LagTrackerRead to calculate
> the
> delay time(refer to view pg_stat_replication).
> Referring to the purpose of LagTrackerWrite, I think it is no need to log time
> when sending keepalive messages here.
> So when the parameter send_keep_alive of function WalSndUpdateProgress is
> true,
> skip the recording time.

I also read them. LagTracker records the elapsed time between sending commit
from publisher and receiving reply from subscriber, right? It seems good.

Do we need adding a test for them? I think it can be added to 100_bugs.pl.
Actually I tried to send PoC, but it does not finish to implement that.
I'll send if it is done.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-03-06 Thread kuroda.hay...@fujitsu.com
Dear Vignesh,

> I felt changing only_local option might be useful for the user while
> modifying the subscription like setting it with a different set of
> publications. Changes for this are included in the v2 patch attached
> at [1].

+1, thanks. I'll post if I notice something to say.

> Shall we get a few opinions on this and take it in that direction?

I prefer subscriber-option, but I also think both are reasonable.
+1 about asking other reviewers.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Handle infinite recursion in logical replication setup

2022-03-06 Thread kuroda.hay...@fujitsu.com
Dear Peter,

> > So, why does the patch use syntax option 1?

IMU it might be useful for the following case.

Assuming that multi-master configuration with node1, node2.
Node1 has a publication pub1 and a subscription sub2, node2 has pub2 and sub1.

From that situation, please consider that new node node3 is added
that subscribe some changes from node2.

If the feature is introduced as option1, new publication must be defined in 
node2.
If that is introduced as option2, however, maybe pub2 can be reused.
i.e. multiple declaration of publications can be avoided.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-03-04 Thread kuroda.hay...@fujitsu.com
Dear Wang,

> Some codes were added in ReorderBufferProcessTXN() for treating DDL,

My mailer went wrong, so I'll put comments again. Sorry.

Some codes were added in ReorderBufferProcessTXN() for treating DDL,
but I doubted updating accept_writes is needed.
IMU, the parameter is read by OutputPluginPrepareWrite() in order align 
messages.
They should have a header - like 'w' - before their body. But here only a 
keepalive message is sent,
no meaningful changes, so I think it might be not needed.
I commented out the line and tested like you did [1], and no timeout and errors 
were found.
Do you have any reasons for that?

https://www.postgresql.org/message-id/OS3PR01MB6275A95FD44DC6C46058EA389E3B9%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-03-03 Thread kuroda.hay...@fujitsu.com
Hi Hackers,

> It's not happy, but I'm not sure about a good solution. I made a timer 
> reschedule
> if connection lost had detected. But if queries in the transaction are quite 
> short,
> catching SIGINT may be fail.

Attached uses another way: sets pending flags again if DoingCommandRead is true.
If a remote server goes down while it is in idle_in_transaction,
next query will fail because of ereport(ERROR).

How do you think?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v14_0001_expose_cancel_message.patch
Description: v14_0001_expose_cancel_message.patch


v14_0002_add_health_check.patch
Description: v14_0002_add_health_check.patch


v14_0003_add_doc.patch
Description: v14_0003_add_doc.patch
<>


RE: Logical replication timeout problem

2022-03-03 Thread kuroda.hay...@fujitsu.com
Dear Wang,

> Attach the new patch. [suggestion by Kuroda-San]
> 1. Fix the typo.
> 2. Improve comment style.
> 3. Fix missing consideration.
> 4. Add comments to clarifies above functions and function calls.

Thank you for updating the patch! I confirmed they were fixed.

```
case REORDER_BUFFER_CHANGE_INVALIDATION:
-   /* Execute the invalidation messages 
locally */
-   ReorderBufferExecuteInvalidations(
-   
  change->data.inval.ninvalidations,
-   
  change->data.inval.invalidations);
-   break;
+   {
+   LogicalDecodingContext *ctx = 
rb->private_data;
+
+   Assert(!ctx->fast_forward);
+
+   /* Set output state. */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn->xid;
+   ctx->write_location = 
change->lsn;
```

Some codes were added in ReorderBufferProcessTXN() for treating DDL, 




I'm also happy if you give the version number :-).


Best Regards,
Hayato Kuroda
FUJITSU LIMITED

> -Original Message-
> From: Wang, Wei/王 威 
> Sent: Wednesday, March 2, 2022 11:06 AM
> To: Kuroda, Hayato/黒田 隼人 
> Cc: Fabrice Chapuis ; Simon Riggs
> ; Petr Jelinek
> ; Tang, Haiying/唐 海英
> ; Amit Kapila ;
> PostgreSQL Hackers ; Ajin Cherian
> 
> Subject: RE: Logical replication timeout problem
> 
> On Mon, Feb 28, 2022 at 6:58 PM Kuroda, Hayato/黒田 隼人
>  wrote:
> > Dear Wang,
> >
> > > Attached a new patch that addresses following improvements I have got
> > > so far as
> > > comments:
> > > 1. Consider other changes that need to be skipped(truncate, DDL and
> > > function calls pg_logical_emit_message). [suggestion by Ajin, Amit]
> > > (Add a new function SendKeepaliveIfNecessary for trying to send
> > > keepalive
> > > message.)
> > > 2. Set the threshold conservatively to a static value of
> > > 1.[suggestion by Amit, Kuroda-San] 3. Reset sendTime in function
> > > WalSndUpdateProgress when send_keep_alive is false. [suggestion by
> > > Amit]
> >
> > Thank you for giving a good patch! I'll check more detail later, but it can 
> > be
> > applied my codes and passed check world.
> > I put some minor comments:
> Thanks for your comments.
> 
> > ```
> > + * Try to send keepalive message
> > ```
> >
> > Maybe missing "a"?
> Fixed. Add missing "a".
> 
> > ```
> > +   /*
> > +   * After continuously skipping SKIPPED_CHANGES_THRESHOLD
> changes, try
> > to send a
> > +   * keepalive message.
> > +   */
> > ```
> >
> > This comments does not follow preferred style:
> > https://www.postgresql.org/docs/devel/source-format.html
> Fixed. Correct wrong comment style.
> 
> > ```
> > @@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext
> *ctx,
> > bool last_write)
> >   * Update progress tracking (if supported).
> >   */
> >  void
> > -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
> > +OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool
> > +send_keep_alive)
> > ```
> >
> > This function is no longer doing just tracking.
> > Could you update the code comment above?
> Fixed. Update the comment above function OutputPluginUpdateProgress.
> 
> > ```
> > if (!is_publishable_relation(relation))
> > return;
> > ```
> >
> > I'm not sure but it seems that the function exits immediately if relation 
> > is a
> > sequence, view, temporary table and so on. Is it OK? Does it never happen?
> I did some checks to confirm this. After my confirmation, there are several
> situations that can cause a timeout. For example, if I insert many date into
> table sql_features in a long transaction, subscriber-side will time out.
> Although I think users should not modify these tables arbitrarily, it could
> happen. To be conservative, I think this use case should be addressed as well.
> Fixed. Invoke function SendKeepaliveIfNecessary before return.
> 
> > ```
> > +   SendKeepaliveIfNecessary(ctx, false);
> > ```
> >
> > I think a comment is needed above which clarifies sending a keepalive
> message.
> Fixed. Before invoking function SendKeepaliveIfNecessary, add the
> corresponding
> comment.
> 
> Attach the new patch. [suggestion by Kuroda-San]
> 1. Fix the typo.
> 2. Improve comment style.
> 3. Fix missing consideration.
> 4. Add comments to clarifies above functions and function calls.
> 
> Regards,
> Wang wei


RE: Handle infinite recursion in logical replication setup

2022-03-01 Thread kuroda.hay...@fujitsu.com
Hi Vignesh,

> In logical replication, currently Walsender sends the data that is
> generated locally and the data that are replicated from other
> instances. This results in infinite recursion in circular logical
> replication setup.

Thank you for good explanation. I understand that this fix can be used
for a bidirectional replication.

> Here there are two problems for the user: a) incremental
> synchronization of table sending both local data and replicated data
> by walsender b) Table synchronization of table using copy command
> sending both local data and replicated data

So you wanted to solve these two problem and currently focused on
the first one, right? We can check one by one.

> For the first problem "Incremental synchronization of table by
> Walsender" can be solved by:
> Currently the locally generated data does not have replication origin
> associated and the data that has originated from another instance will
> have a replication origin associated. We could use this information to
> differentiate locally generated data and replicated data and send only
> the locally generated data. This "only_local" could be provided as an
> option while subscription is created:
> ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=5433'
> PUBLICATION pub1 with (only_local = on);

Sounds good, but I cannot distinguish whether the assumption will keep.

I played with your patch, but it could not be applied to current master.
I tested from bd74c40 and I confirmed infinite loop was not appeared.

local_only could not be set from ALTER SUBSCRIPTION command.
Is it expected?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-02-28 Thread kuroda.hay...@fujitsu.com
Dear Wang,

> Attached a new patch that addresses following improvements I have got so far 
> as
> comments:
> 1. Consider other changes that need to be skipped(truncate, DDL and function
> calls pg_logical_emit_message). [suggestion by Ajin, Amit]
> (Add a new function SendKeepaliveIfNecessary for trying to send keepalive
> message.)
> 2. Set the threshold conservatively to a static value of 1.[suggestion by 
> Amit,
> Kuroda-San]
> 3. Reset sendTime in function WalSndUpdateProgress when send_keep_alive is
> false. [suggestion by Amit]

Thank you for giving a good patch! I'll check more detail later,
but it can be applied my codes and passed check world.
I put some minor comments:

```
+ * Try to send keepalive message
```

Maybe missing "a"?

```
+   /*
+   * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to 
send a
+   * keepalive message.
+   */
```

This comments does not follow preferred style:
https://www.postgresql.org/docs/devel/source-format.html

```
@@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, 
bool last_write)
  * Update progress tracking (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool 
send_keep_alive)
```

This function is no longer doing just tracking.
Could you update the code comment above?

```
if (!is_publishable_relation(relation))
return;
```

I'm not sure but it seems that the function exits immediately if relation
is a sequence, view, temporary table and so on. Is it OK? Does it never happen?

```
+   SendKeepaliveIfNecessary(ctx, false);
```

I think a comment is needed above which clarifies sending a keepalive message.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Logical replication timeout problem

2022-02-24 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thank you for teaching some backgrounds about the patch.

> According to our discussion, we need to send keepalive messages to subscriber
> when skipping changes.
> One approach is that **for each skipped change**, we try to send keepalive
> message by calculating whether a timeout will occur based on the current time
> and the last time the keepalive was sent. But this will brings slight 
> overhead.
> So I want to try another approach: after **constantly skipping some changes**,
> we try to send keepalive message by calculating whether a timeout will occur
> based on the current time and the last time the keepalive was sent.

You meant that calling system calls like GetCurrentTimestamp() should be 
reduced,
right? I'm not sure how it affects but it seems reasonable.

> IMO, we should send keepalive message after skipping a certain number of
> changes constantly.
> And I want to calculate the threshold dynamically by using a fixed value to
> avoid adding too much code.
> In addition, different users have different machine performance, and users can
> modify wal_sender_timeout, so the threshold should be dynamically calculated
> according to wal_sender_timeout.

Your experiment seems not bad, but the background cannot be understand from
code comments. I prefer a static threshold because it's more simple,
which as Amit said in the following too:

https://www.postgresql.org/message-id/CAA4eK1%2B-p_K_j%3DNiGGD6tCYXiJH0ypT4REX5PBKJ4AcUoF3gZQ%40mail.gmail.com

> In the existing code, similar operations on wal_sender_timeout use the style 
> of
> (wal_sender_timeout / 2), e.g. function WalSndKeepaliveIfNecessary. So I think
> it should be consistent in this patch.
> But I think it is better to use magic number too. Maybe we could improve it in
> a new thread.

I confirmed the code and +1 yours. We should treat it in another thread if 
needed.

BTW, this patch cannot be applied to current master.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-23 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thank you for your quick reviewing! I attached new version.
I found previous patches have wrong name. Sorry.

> The connection check timer is re-scheduled repeatedly even while the backend 
> is
> in idle state or is running a local transaction that doesn't access to any 
> foreign
> servers. I'm not sure if it's really worth checking the connections even in 
> those
> states. Even without the periodic connection checks, if the connections are 
> closed
> in those states, subsequent GetConnection() will detect that closed connection
> and re-establish the connection when starting remote transaction. Thought?

Indeed. We can now control the timer in fdw layer, so disable_timeout() was 
added
at the bottom of pgfdw_xact_callback(). 

> When a closed connection is detected in idle-in-transaction state and SIGINT 
> is
> raised, nothing happens because there is no query running to be canceled by
> SIGINT. Also in this case the connection check timer gets disabled. So we can 
> still
> execute queries that don't access to foreign servers, in the same 
> transaction, and
> then the transaction commit fails. Is this expected behavior?

It's not happy, but I'm not sure about a good solution. I made a timer 
reschedule
if connection lost had detected. But if queries in the transaction are quite 
short,
catching SIGINT may be fail.

> When I shutdowned the foreign server while the local backend is in
> idle-in-transaction state, the connection check timer was triggered and 
> detected
> the closed connection. Then when I executed COMMIT command, I got the
> following WARNING message. Is this a bug?
> 
>  WARNING:  leaked hash_seq_search scan for hash table 0x7fd2ca878f20

Fixed. It is caused because hash_seq_term() was not called when checker detects
a connection lost.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v13_0001_expose_cancel_message.patch
Description: v13_0001_expose_cancel_message.patch


v13_0002_add_health_check.patch
Description: v13_0002_add_health_check.patch


v13_0003_add_doc.patch
Description: v13_0003_add_doc.patch
<>


RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-21 Thread kuroda.hay...@fujitsu.com
Cfbot is still angry because of missing PGDLLIMPORT, so attached.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v12_0001_add_checking_infrastracture.patch
Description: v12_0001_add_checking_infrastracture.patch


v12_0002_add_health_check.patch
Description: v12_0002_add_health_check.patch


v12_0003_add_doc.patch
Description: v12_0003_add_doc.patch
<>


RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-21 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

> cfbot is reporting that the 0002 patch fails to be applied cleanly. Could you 
> update
> the patch?
> http://cfbot.cputube.org/patch_37_3388.log

Thanks for reporting and sorry for inconvenience.
I repo was not latest version. Attached can be applied to 52e4f0c

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v11_0001_add_checking_infrastracture.patch
Description: v11_0001_add_checking_infrastracture.patch


v11_0002_add_health_check.patch
Description: v11_0002_add_health_check.patch


v11_0003_add_doc.patch
Description: v11_0003_add_doc.patch
<>


RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-21 Thread kuroda.hay...@fujitsu.com
Dear Horiguchi-san, Fujii-san,

> > I understood here as removing following mechanism from core:
> >
> > * disable timeout at end of tx.
> > * skip if held off or read commands
> 
> I think we're on the same page. Anyway query cancel interrupt is
> ignored while rading input.
> 
> > > - If an existing connection is found to be dead, just try canceling
> > >   the query (or sending query cancel).
> > > One issue with it is how to show the decent message for the query
> > > cancel, but maybe we can have a global variable that suggests the
> > > reason for the cancel.
> >
> > Currently I have no good idea for that but I'll try.
> 
> However, I would like to hear others' opnions about the direction, of
> course.
>

Based on the idea, I re-implemented the feature. Almost all feature is
moved to postgres_fdw. The abstract of my patch is as follows:

# core

* Exposes QueryCancelMessage for error reporting
* Uses above if it was not NULL

# postgres_fdw

* Defines new GUC postgres_fdw.health_check_interval.
  It is renamed simpler.
* Registers a timeout when initializing connection hash.
* Raises SIGINT and sets QueryCancelMessage to message.
  if detects a connection lost.

I also attached a test as zipped file for keep cfbot quiet.
When connection lost is detected, the following message will show:

```
ERROR:  Foreign Server (servername) might be down.
```

How do you think?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v10_0001_expose_cancel_message.patch
Description: v10_0001_expose_cancel_message.patch


v10_0002_add_health_check.patch
Description: v10_0002_add_health_check.patch


v10_0003_add_doc.patch
Description: v10_0003_add_doc.patch
<>


RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-21 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

> Isn't it a very special case where many FDWs use their own user timeouts? 
> Could
> you tell me the assumption that you're thinking, especially how many FDWs are
> working?

I came up with the case like star schema, which postgres database connects data 
store.
If each dbms are different and FDWs have each timeout, many timeout will be 
registered.
But it may be a corner case and should not be confused with OLTP case. 
So I'll post new patch based on his post.


Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-17 Thread kuroda.hay...@fujitsu.com
Dear Horiguchi-san,

> I think we just don't need to add the special timeout kind to the
> core.  postgres_fdw can use USER_TIMEOUT and it would be suffiction to
> keep running health checking regardless of transaction state then fire
> query cancel if disconnection happens. As I said in the previous main,
> possible extra query cancel woud be safe.

I finally figured out that you mentioned about user-defined timeout system.
Firstly - before posting to hackers - I designed like that,
but I was afraid of an overhead that many FDW registers timeout
and call setitimer() many times. Is it too overcautious?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED
 





RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-16 Thread kuroda.hay...@fujitsu.com
> I understood here as removing following mechanism from core:
> 
> * disable timeout at end of tx.

While reading again and this part might be wrong.
Sorry for inconvenience.
But anyway some codes should be (re)moved from core, right?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED




RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-16 Thread kuroda.hay...@fujitsu.com
Dear Horiguchi-san,

Thank you for giving your suggestions. I want to confirm your saying.

> FWIW, I'm not sure this feature necessarily requires core support
> dedicated to FDWs.  The core have USER_TIMEOUT feature already and
> FDWs are not necessarily connection based.  It seems better if FDWs
> can implement health check feature without core support and it seems
> possible.  Or at least the core feature should be more generic and
> simpler. Why don't we just expose InTransactionHealthCheckCallbacks or
> something and operating functions on it?

I understood that core is too complicated and FDW side is too stupid, right?

> Mmm. AFAICS the running command will stop with "canceling statement
> due to user request", which is a hoax.  We need a more decent message
> there.

+1 about better messages.

> I understand that the motive of this patch is "to avoid wasted long
> local work when fdw-connection dies".

Yeah your understanding is right.

> In regard to the workload in
> your first mail, it is easily avoided by ending the transaction as soon
> as remote access ends. This feature doesn't work for the case "begin;
> ; ". But the same measure also works in
> that case.  So the only case where this feature is useful is "begin;
> ; ; ; end;".  But in the first
> place how frequently do you expecting remote-connection close happens?
> If that happens so frequently, you might need to recheck the system
> health before implementing this feature.  Since it is correctly
> detected when something really went wrong, I feel that it is a bit too
> complex for the usefulness especially for the core part.

Thanks for analyzing motivation.
Indeed, some cases may be resolved by separating tx and this event rarely 
happens.

> In conclusion, as my humble opinion I would like to propose to reduce
> this feature to:
> 
> - Just periodically check health (in any aspect) of all live
>   connections regardless of the session state.

I understood here as removing following mechanism from core:

* disable timeout at end of tx.
* skip if held off or read commands

> - If an existing connection is found to be dead, just try canceling
>   the query (or sending query cancel).
> One issue with it is how to show the decent message for the query
> cancel, but maybe we can have a global variable that suggests the
> reason for the cancel.

Currently I have no good idea for that but I'll try.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED





RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-15 Thread kuroda.hay...@fujitsu.com
Dear Hackers,

I found patches we depend have been committed, so rebased.
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=50e570a59e7f86bb41f029a66b781fc79b8d50f1

In this version there is a little bit change in part of postgres_fdw.
A system checking by WaitEventSetCanReportClosed() is added
because some OSes cannot wait WL_SOCKET_CLOSED event.
Note that test cannot be added in the regression test
because cfbot may be not happy.
In my environment a test that contained in previous patches works well.

0001, 0002 is not changed from previous version. 

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



v09_0003_helth_check_for_postgres_fdw.patch
Description: v09_0003_helth_check_for_postgres_fdw.patch


v09_0002_add_doc_about_infrastructure.patch
Description: v09_0002_add_doc_about_infrastructure.patch


v09_0001_add_checking_infrastracture.patch
Description: v09_0001_add_checking_infrastracture.patch


RE: Logical replication timeout problem

2022-02-08 Thread kuroda.hay...@fujitsu.com
Dear Wang,

Thank you for making a patch.
I applied your patch and confirmed that codes passed regression test.
I put a short reviewing:

```
+   static int skipped_changes_count = 0;
+   /*
+* Conservatively, at least 150,000 changes can be skipped in 1s.
+*
+* Because we use half of wal_sender_timeout as the threshold, and the 
unit
+* of wal_sender_timeout in process is ms, the final threshold is
+* wal_sender_timeout * 75.
+*/
+   int skipped_changes_threshold = wal_sender_timeout * 75;
```

I'm not sure but could you tell me the background of this calculation? 
Is this assumption reasonable?

```
@@ -654,20 +663,62 @@ pgoutput_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
+   {
+   if (++skipped_changes_count >= 
skipped_changes_threshold)
+   {
+   OutputPluginUpdateProgress(ctx, true);
+
+   /*
+* After sending keepalive message, 
reset
+* skipped_changes_count.
+*/
+   skipped_changes_count = 0;
+   }
return;
+   }
break;
```

Is the if-statement needed? In the walsender case OutputPluginUpdateProgress() 
leads WalSndUpdateProgress(),
and the function also has the threshold for ping-ing.

```
static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, 
TransactionId xid, bool send_keep_alive)
 {
-   static TimestampTz sendTime = 0;
+   static TimestampTz trackTime = 0;
TimestampTz now = GetCurrentTimestamp();
 
+   if (send_keep_alive)
+   {
+   /*
+* If half of wal_sender_timeout has lapsed without send 
message standby,
+* send a keep-alive message to the standby.
+*/
+   static TimestampTz sendTime = 0;
+   TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+   
wal_sender_timeout / 2);
+   if (now >= ping_time)
+   {
+   WalSndKeepalive(false);
+
+   /* Try to flush pending output to the client */
+   if (pq_flush_if_writable() != 0)
+   WalSndShutdown();
+   sendTime = now;
+   }
+   }
+
```

* +1 about renaming to trackTime.
* `/2` might be magic number. How about following? Renaming is very welcome:

```
+#define WALSND_LOGICAL_PING_FACTOR 0.5
+   static TimestampTz sendTime = 0;
+   TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+   
wal_sender_timeout * WALSND_LOGICAL_PING_FACTOR)
```

Could you add a commitfest entry for cfbot?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-02-02 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thank you for good suggestions.

> This logic sounds complicated to me. I'm afraid that FDW developers may a bit
> easily misunderstand the logic and make the bug in their FDW.
> Isn't it simpler to just disable the timeout in core whenever the transaction 
> ends
> whether committed or aborted, like statement_timeout is disabled after each
> command? For example, call something like DisableForeignCheckTimeout() in
> CommitTransaction() etc.

Your idea is that stop the timer at the end of each transactions, right?
I had not adopted that because I thought some developers might want not to stop 
the timer
even if transactions ends. It caused complexed situation and not have good 
usecase, however,
so your logic was implemented.

> > You are right. I think this suggests that error-reporting should be done in 
> > the
> core layer.
> > For meaningful error reporting, I changed a callback specification
> > that it should return ForeignServer* which points to downed remote server.
> 
> This approach seems to assume that FDW must manage all the ForeignServer
> information so that the callback can return it. Is this assumption valid for 
> all the
> FDW?

Not sure, the assumption might be too optimistic. 
mysql_fdw can easily return ForeignServer* because it caches serverid,
but dblink and maybe oracle_fdw cannot.

> How about making FDW trigger a query cancel interrupt by signaling SIGINT to
> the backend, instead?

I understood that the error should be caught by QueryCancelPending.
Could you check 0003? Does it follow that?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

<>


v08_0001_add_checking_infrastracture.patch
Description: v08_0001_add_checking_infrastracture.patch


v08_0002_add_doc.patch
Description: v08_0002_add_doc.patch


RE: [Proposal] Add foreign-server health checks infrastructure

2022-01-31 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thank you for reviewing! I attached the latest version.

> When more than one FDWs are used, even if one FDW calls this function to
> disable the timeout, its remote-server-check-callback can still be called. Is 
> this
> OK?
> Please imagine the case where two FDWs are used and they registered their
> callbacks. Even when one FDW calls TryDisableRemoteServerCheckingTimeout(),
> if another FDW has not called that yet, the timeout is still being enabled. 
> If the
> timeout is triggered during that period, even the callback registered by the 
> FDW
> that has already called TryDisableRemoteServerCheckingTimeout() would be
> called.

Indeed and it should be avoided. I added a counter to 
CheckingRemoteServersCallbackItem.
The register function returns the registered item, and it must be set as the 
argument for
enable and disable functions.
Callback functions will be called when item->counter is larger than zero.

> + if (remote_servers_connection_check_interval > 0)
> + {
> + CallCheckingRemoteServersCallbacks();
> +
>   enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT,
> +
> remote_servers_connection_check_interval);
> 
> LockErrorCleanup() needs to be called before reporting the error, doesn't it?

You are right. I think this suggests that error-reporting should be done in the 
core layer.
For meaningful error reporting, I changed a callback specification
that it should return ForeignServer* which points to downed remote server.

> This can cause an error even while DoingCommandRead == true. Is that safe?

I read codes again and I think it is not safe. It is OK when whereToSendOutput 
is DestRemote,
but not good in InteractiveBackend().

I changed that if-statement for CheckingRemoteServersTimeoutPending is moved 
just after
ClientConnectionLost, because the that may throw a FATAL error and disconnect 
from client.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

<>


v07_0001_add_checking_infrastracture.patch
Description: v07_0001_add_checking_infrastracture.patch


v07_0002_add_doc.patch
Description: v07_0002_add_doc.patch


RE: [Proposal] Add foreign-server health checks infrastructure

2022-01-20 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san, Zhihong,

I attached the latest version.
The biggest change is that callbacks are no longer un-registered at the end of 
transactions.
FDW developer must enable or disable timeout instead, via new APIs.

The timer will be turned on when:
* new GUC is >= 0, and
* anyone calls TryEnableRemoteServerCheckingTimeout().

I think this version is reduced overhead, but it might not be developer 
friendly...

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

<>


v06_0002_add_doc.patch
Description: v06_0002_add_doc.patch


v06_0001_add_checking_infrastracture.patch
Description: v06_0001_add_checking_infrastracture.patch


RE: [Proposal] Add foreign-server health checks infrastructure

2022-01-20 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thank you for your interest! I'll post new version within several days.

> > Yeah, remote-checking timeout will be enable even ifa local transaction is
> opened.
> > In my understanding, postgres cannot distinguish whether opening 
> > transactions
> > are using only local object or not.
> > My first idea was that turning on the timeout when GetFdwRoutineXXX
> functions
> > were called,
> 
> What about starting the timeout in GetConnection(), instead?

Did you said about a function in postgres_fdw/connection.c?
In my understanding that means that the timeout should be enabled or disabled
by each FDW extensions.
I did not find bad cases for that, so I'll change like that and make new APIs.

> v05_0004_add_tests.patch failed to be applied to the master. Could you rebase 
> it?

It's caused because a testcase was added in postgres_fdw. Will rebase.

> The above change is included in both
> v5-0003-Use-WL_SOCKET_CLOSED-for-client_connection_check_.patch and
> v05_0002_add_doc.patch. If it should be in the former patch, it should be 
> removed
> from your patch v05_0002_add_doc.patch.

I confused about doc-patch. Sorry for inconvenience.

> There seems no user of UnregisterCheckingRemoteServersCallback(). So how
> about removing it?

Previously I kept the API for any other extensions, but I cannot find use cases.
Will remove.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [Proposal] Add foreign-server health checks infrastructure

2022-01-19 Thread kuroda.hay...@fujitsu.com
Dear Zhihong,

Thank you for reviewing! And I have to apologize for the late.
I'll post new patchset later.

> +   UnregisterAllCheckingRemoteServersCallback();
> 
> UnregisterAllCheckingRemoteServersCallback
> -> UnregisterAllCheckingRemoteServersCallbacks

Will fix.

> +   CheckingRemoteServersCallbackItem *item;
> +   item = fdw_callbacks;
> 
> The above two lines can be combined.

Will fix.

> +UnregisterCheckingRemoteServersCallback(CheckingRemoteServersCallback
> callback,
> +   void *arg)
> 
> Is the above func called anywhere ?

Currently not, I just kept the API because of any other FDW extensions.
But I cannot find any use cases, so will remove.

> +   if (item->callback == callback && item->arg == arg)
> 
> Is comparing argument pointers stable ? What if the same argument is cloned
> ?

This part is no longer needed. Will remove.

> +   CallCheckingRemoteServersCallbacks();
> +
> +   if (remote_servers_connection_check_interval > 0)
> +   enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT,
> +
>  remote_servers_connection_check_interval);
>
> Should the call to CallCheckingRemoteServersCallbacks() be placed under the
> if block checking remote_servers_connection_check_interval ?
> If remote_servers_connection_check_interval is 0, it seems the callbacks
> shouldn't be called.

Agreed. We force stopping timeout when the GUC sets to 0 in assign_hook,
so your saying is consistent. Will move.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-27 Thread kuroda.hay...@fujitsu.com
Dear Sawada-san,

> Good idea. But the application_name is actually truncated to 63
> characters (NAMEDATALEN - 1)? If so, we need to do substring(... for
> 63) instead.

Yeah, the parameter will be truncated as one less than NAMEDATALEN:

```
max_identifier_length (integer)
Reports the maximum identifier length. It is determined as one less than the 
value of NAMEDATALEN when building the server.
The default value of NAMEDATALEN is 64; therefore the default 
max_identifier_length is 63 bytes,
which can be less than 63 characters when using multibyte encodings.
```

But in Fujii-san's patch length is picked up by the following SQL, so I think 
it works well.

```
SELECT max_identifier_length FROM pg_control_init()
```

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-26 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

> Attached is the patch that adds the regression test for
> postgres_fdw.application_name. Could you review this?
> 
> As Horiguchi-san suggested at [1], I split the test into two, and then 
> tweaked them
> as follows.
> 
> 1. Set application_name option of a server option to 'fdw_%d%p'
> 2. Set postgres_fdw.application_name to 'fdw_%a%u%%'
> 
> 'fdw_%d%p' and 'fdw_%a%u%%' still may be larger than NAMEDATALEN
> depending on the regression test environment. To make the test stable even in
> that case, the patch uses substring() is truncate application_name string in 
> the
> test query's condition to less than NAMEDATALEN.

I think it's good because we can care about max_identifier_length,
and both of setting methods are used.
Also it's smart using pg_terminate_backend().
(Actually I'm writing a test that separates test cases into five parts, but
your patch is better.)

I think the test failure should be noticed by me, but I missed, sorry.
Do you know how to apply my patch and test by buildfarm animals?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-26 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san, Horiguchi-san,

I confirmed that the feature was committed but reverted the test.
Now I'm checking buildfarm.

But anyway I want to say thank you for your contribution!

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] pg_stat_toast v0.3

2021-12-21 Thread kuroda.hay...@fujitsu.com
Dear Gunnar,

> The attached v0.3 includes
>   * columns "storagemethod" and "compressmethod" added as per Hayato
> Kuroda's suggestion

I prefer your implementation that referring another system view.

> * gathering timing information

Here is a minor comment:
I'm not sure when we should start to measure time.
If you want to record time spent for compressing, toast_compress_datum() should 
be
sandwiched by INSTR_TIME_SET_CURRENT() and caclurate the time duration.
Currently time_spent is calcurated in the pgstat_report_toast_activity(),
but this have a systematic error.
If you want to record time spent for inserting/updating, however,
I think we should start measuring at the upper layer, maybe 
heap_toast_insert_or_update().

> Any hints on how to write meaningful tests would be much appreciated!
> I.e., will a test

I searched hints from PG sources, and I thought that modules/ subdirectory can 
be used.
Following is the example:
https://github.com/postgres/postgres/tree/master/src/test/modules/commit_ts

I attached a patch to create a new test. Could you rewrite the sql and the 
output file?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



adding_new_test.patch
Description: adding_new_test.patch


RE: [PATCH] pg_stat_toast

2021-12-19 Thread kuroda.hay...@fujitsu.com
Dear Gunnar,

> postgres=# CREATE TABLE test (i int, lz4 text COMPRESSION lz4, std text);
> postgres=# INSERT INTO test  SELECT
> i,repeat(md5(i::text),100),repeat(md5(i::text),100) FROM
> generate_series(0,10) x(i);
> postgres=# SELECT * FROM pg_stat_toast WHERE schemaname = 'public';
> -[ RECORD 1 ]+--
> schemaname   | public
> reloid   | 16829
> attnum   | 2
> relname  | test
> attname  | lz4
> externalizations | 0
> compressions | 11
> compressionsuccesses | 11
> compressionsizesum   | 6299710
> originalsizesum  | 320403204
> -[ RECORD 2 ]+--
> schemaname   | public
> reloid   | 16829
> attnum   | 3
> relname  | test
> attname  | std
> externalizations | 0
> compressions | 11
> compressionsuccesses | 11
> compressionsizesum   | 8198819
> originalsizesum  | 320403204

I'm not sure about TOAST, but currently compressions are configurable:
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=bbe0a81db69bd10bd166907c3701492a29aca294

How about adding a new attribute "method" to pg_stat_toast?
ToastAttrInfo *attr->tai_compression represents how compress the data,
so I think it's easy to add.
Or, is it not needed because pg_attr has information?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-16 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Sorry I forgot replying your messages.

> >>if (strcmp(keywords[i], "application_name") == 0 &&
> >>values[i] != NULL && *(values[i]) != '\0')
> >
> > I'm not sure but do we have a case that values[i] becomes NULL
> > even if keywords[i] is "application_name"?
> 
> No for now, I guess. But isn't it safer to check that, too? I also could not 
> find strong
> reason why that check should be dropped. But you'd like to drop that?

No, I agreed the new checking. I'm just afraid of my code missing.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-16 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

> Thanks for the review! At first I pushed 0001 patch.

I found your commit. Thanks!

> BTW, 0002 patch adds the regression test that checks
> pg_stat_activity.application_name. But three months before, we added the 
> similar
> test when introducing postgres_fdw.application_name GUC and
> reverted/removed it because it's not stable [1]. So we should review carefully
> whether the test 0002 patch adds may have the same issue or not. As far as I 
> read
> the patch, ISTM that the patch has no same issue. But could you double check
> that?

I agreed we will not face the problem.
When postgres_fdw_disconnect_all() is performed, we just send a character 'X' to
remote backends(in sendTerminateConn() and lower functions) and return without 
any blockings.
After receiving 'X' message in remote backends, proc_exit() is performed and 
processes
will be died. The test failure is caused because SELECT statement is performed
before dying backends perfectly. 
Currently we search pg_stat_activity and this is not affected by residual rows
because the condition is too strict to exist others.
Hence I think this test is stable. How do you think?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: Allow escape in application_name

2021-12-15 Thread kuroda.hay...@fujitsu.com
Dear Fujii-san,

Thank you for updating! I read your patches and I have
only one comment.

>   if (strcmp(keywords[i], "application_name") == 0 &&
>   values[i] != NULL && *(values[i]) != '\0')

I'm not sure but do we have a case that values[i] becomes NULL
even if keywords[i] is "application_name"?

I think other parts are perfect.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



  1   2   3   >