Hi Euler, thank you for your reply. Your output is exactly how mine doesn't look like, I don't have such an order - that is - not only under heavy load. Conditions in which this occurs make it challenging to provide detailed information, and will also take a while to trigger. I've sent a previous email explaining how my output looks like, from a previous debug.
I can gather more information if needs be, but I was interested in this bit: > Instead, Postgres provides a replication progress mechanism [1] to do it. It's not 100% clear to me how that would look like at the code level, can you provide a high-level algorithm on how such code would work? For reference, our implementation - to the bones - is very similar to this: https://adam-szpilewicz.pl/cdc-replication-from-postgresql-using-go-golang Thanks for your help. Regards, José Neves ________________________________ De: Euler Taveira <eu...@eulerto.com> Enviado: 31 de julho de 2023 15:27 Para: José Neves <rafanev...@msn.com>; pgsql-hackers <pgsql-hack...@postgresql.org> Assunto: Re: CDC/ETL system on top of logical replication with pgoutput, custom client On Sat, Jul 29, 2023, at 8:07 PM, José Neves wrote: I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages. Can you provide a test case to show this issue? Did you try in a newer version? The problem is as follows: postgres streams A, B, D, G, K, I, P logical replication events, upon exit signal we stop consuming new events at LSN K, and we wait 30s for out-of-order events. Let's say that we only got A, (and K ofc) so in the following 30s, we get B, D, however, for whatever reason, G never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that G was missing, so we assumed that it all arrived, committing K to postgres slot and shutdown. In the next run, our worker will start receiving data from K forward, and G is lost forever... Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient. Logical decoding provides the changes to output plugin at commit time. You mentioned the logical replication events but didn't say which are part of the same transaction. Let's say A, B, D and K are changes from the same transaction and G, I and P are changes from another transaction. The first transaction will be available when it processes K. The second transaction will be provided when the logical decoding processes P. You didn't say how your consumer is working. Are you sure your consumer doesn't get the second transaction? If your consumer is advancing the replication slot *after* receiving K (using pg_replication_slot_advance), it is doing it wrong. Another common problem with consumer is that it uses pg_logical_slot_get_changes() but *before* using the data it crashes; in this case, the data is lost. It is hard to say where the problem is if you didn't provide enough information about the consumer logic and the WAL information (pg_waldump output) around the time you detect the data loss. In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data. It seems very either to me that none of the open-source CDC projects that I looked into care about this. They always assume that the next LSN received is... well the next one, and commit that one, so upon restart, they are vulnerable to the same issue. So... either I'm missing something... or we have a generalized assumption causing data loss under certain conditions all over. Let me illustrate the current behavior. Let's say there are 3 concurrent transactions. Session A ========== euler=# SELECT pg_create_logical_replication_slot('repslot1', 'wal2json'); pg_create_logical_replication_slot ------------------------------------ (repslot1,0/369DF088) (1 row) euler=# create table foo (a int primary key); CREATE TABLE euler=# BEGIN; BEGIN euler=*# INSERT INTO foo (a) SELECT generate_series(1, 2); INSERT 0 2 Session B ========== euler=# BEGIN; BEGIN euler=*# INSERT INTO foo (a) SELECT generate_series(11, 12); INSERT 0 2 Session C ========== euler=# BEGIN; BEGIN euler=*# INSERT INTO foo (a) SELECT generate_series(21, 22); INSERT 0 2 Session A ========== euler=*# INSERT INTO foo (a) VALUES(3); INSERT 0 1 Session B ========== euler=*# INSERT INTO foo (a) VALUES(13); INSERT 0 1 Session C ========== euler=*# INSERT INTO foo (a) VALUES(23); INSERT 0 1 euler=*# COMMIT; COMMIT Session B ========== euler=*# COMMIT; COMMIT Session A ========== euler=*# COMMIT; COMMIT The output is: euler=# SELECT * FROM pg_logical_slot_peek_changes('repslot1', NULL, NULL, 'format-version', '2', 'include-types', '0'); lsn | xid | data ------------+--------+------------------------------------------------------------------------------------ 0/369E4800 | 454539 | {"action":"B"} 0/36A05088 | 454539 | {"action":"C"} 0/36A05398 | 454542 | {"action":"B"} 0/36A05398 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":21}]} 0/36A05418 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":22}]} 0/36A05658 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":23}]} 0/36A057C0 | 454542 | {"action":"C"} 0/36A05258 | 454541 | {"action":"B"} 0/36A05258 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":11}]} 0/36A052D8 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":12}]} 0/36A05598 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":13}]} 0/36A057F0 | 454541 | {"action":"C"} 0/36A050C0 | 454540 | {"action":"B"} 0/36A050C0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":1}]} 0/36A051A0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":2}]} 0/36A054D8 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":3}]} 0/36A05820 | 454540 | {"action":"C"} (17 rows) Since session C committed first, it is the first transaction available to output plugin (wal2json). Transaction 454541 is the next one that is available because it committed after session C (transaction 454542) and the first transaction that started (session A) is the last one available. You can also notice that the first transaction (454540) is the last one available. Your consumer cannot rely on LSN position or xid to track the progress. Instead, Postgres provides a replication progress mechanism [1] to do it. [1] https://www.postgresql.org/docs/current/replication-origins.html -- Euler Taveira EDB https://www.enterprisedb.com/