Hello,
On 2020-Nov-24, Fujii Masao wrote:
> Thanks for working on this!
> Could you tell me the discussion thread where Chloe Dives reported the issue
> to?
> Sorry I could not find that..
It was not public -- sorry I didn't make that clear.
> I'd like to see the procedure to reproduce the issue.
Here's the script.
Thanks!
import psycopg2
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL
def _logical_replication_callback(message):
''' Deal with a single audit_json message; see _process_message. We get one message, therefore one
call to this method, per committed transaction on the source database.
'''
print("Raw message: " + message)
message.cursor.send_feedback(flush_lsn=message.data_start)
def main():
slot_name = 'snitch_papersnap_testing'
connection = psycopg2.connect(
host='fab-devdb02',
port=5432,
dbname='postgres',
user='postgres',
connection_factory=LogicalReplicationConnection,
)
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE slot_name = %s", (slot_name,))
slot_exists, = cursor.fetchone()
if slot_exists:
cursor.drop_replication_slot(slot_name)
slot_exists = False
if not slot_exists:
cursor.create_replication_slot(slot_name, REPLICATION_LOGICAL, output_plugin='test_decoding')
cursor.start_replication(slot_name, REPLICATION_LOGICAL, decode=True)
print("Logical replication started")
cursor.consume_stream(_logical_replication_callback)
if __name__ == '__main__':
main()