I'm working with the logical replication support in psycopg2, and have found
something surprising... this may be my error, of course!
My sample program is below. It works wonderfully, but in the case when it
starts, it re-receives the last message that it handled, even with flushing it.
Example:
postgres@localhost:~/wal2pubsub$ python waltest.py
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
^C
postgres@localhost:~/wal2pubsub$ python waltest.py
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
There was no database activity in that period; it just replayed the same
message. Shouldn't it have flushed to the end of the WAL stream and not
reprocessed the last message?
--
import psycopg2
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL
conn = psycopg2.connect('dbname=postgres',
connection_factory=LogicalReplicationConnection)
cur = conn.cursor()
cur.start_replication(slot_name='test_slot', slot_type=REPLICATION_LOGICAL)
from select import select
from datetime import datetime
def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
try:
cur.consume_stream(consume)
except:
pass
--
-- Christophe Pettus
[email protected]