From c4211450760130d1235216c3d6aef1ea4b6b5f63 Mon Sep 17 00:00:00 2001
From: dilipkumar <dilipbalaut@gmail.com>
Date: Thu, 10 Sep 2020 11:39:27 +0530
Subject: [PATCH v2] Skip printing empty stream in test decoding

---
 contrib/test_decoding/test_decoding.c | 50 ++++++++++++++++++---------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9..c8df868718 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 							  Size sz, const char *message);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+								   TestDecodingData *data,
+								   ReorderBufferTXN *txn,
+								   bool last_write);
 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,16 +587,21 @@ pg_decode_message(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	data->xact_wrote_changes = false;
+	if (data->skip_empty_xacts)
+		return;
+	pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
@@ -601,16 +610,15 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_stop(LogicalDecodingContext *ctx,
 					  ReorderBufferTXN *txn)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_abort(LogicalDecodingContext *ctx,
 					   ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 	OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 
 	if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	/* output stream start if we haven't yet */
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+	data->xact_wrote_changes = true;
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,11 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+	{
+		pg_output_stream_start(ctx, data, txn, false);
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	if (data->include_xids)
 		appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
-- 
2.23.0

