After the recent rash of fixes to the astreamer code, I thought it might be a good idea to take a closer look for more issues. The attached proposes six fixes. Full disclosure: I found two of these (those in astreamer_tar_parser_free() and astreamer_extractor_content() ), and claude found the rest. I believe all those it found are indeed things that should be fixed (and backpatched). The wrong data pointer issue is one I suspect it would have been quite hard to find.
cheers andrew -- Andrew Dunstan EDB: https://www.enterprisedb.com
From 4f66842c7870711285b47503e05467a35d4ba499 Mon Sep 17 00:00:00 2001 From: Andrew Dunstan <[email protected]> Date: Mon, 23 Mar 2026 16:17:08 -0400 Subject: [PATCH v1] Fix multiple bugs in astreamer pipeline code. astreamer_tar_parser_content() sent the wrong data pointer when forwarding MEMBER_TRAILER padding to the next streamer. After astreamer_buffer_until() buffers the padding bytes, the 'data' pointer has been advanced past them, but the code passed 'data' instead of bbs_buffer.data. This caused the downstream consumer to receive bytes from after the padding rather than the padding itself, and could read past the end of the input buffer. astreamer_gzip_decompressor_content() only checked for Z_STREAM_ERROR from inflate(), silently ignoring Z_DATA_ERROR (corrupted data) and Z_MEM_ERROR (out of memory). Fix by treating any return other than Z_OK, Z_STREAM_END, and Z_BUF_ERROR as fatal. Also break out of the decompression loop on Z_STREAM_END to avoid calling inflate() on a finished stream. astreamer_gzip_decompressor_free() never called inflateEnd() to release zlib's internal decompression state. Both the lz4 and zstd decompressor free functions properly release their contexts. astreamer_tar_parser_free() neglected to pfree() the streamer struct itself, leaking it. Every other astreamer free function frees its own struct. astreamer_extractor_content() did not check the return value of fclose() when closing an extracted file. A deferred write error (e.g., disk full on buffered I/O) would be silently lost. The plain writer's finalize in the same file already checks fclose. --- src/fe_utils/astreamer_file.c | 4 +++- src/fe_utils/astreamer_gzip.c | 14 ++++++++++++-- src/fe_utils/astreamer_tar.c | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c index 6e63a41af0d..158e9a14f2c 100644 --- a/src/fe_utils/astreamer_file.c +++ b/src/fe_utils/astreamer_file.c @@ -266,7 +266,9 @@ astreamer_extractor_content(astreamer *streamer, astreamer_member *member, case ASTREAMER_MEMBER_TRAILER: if (mystreamer->file == NULL) break; - fclose(mystreamer->file); + if (fclose(mystreamer->file) != 0) + pg_fatal("could not close file \"%s\": %m", + mystreamer->filename); mystreamer->file = NULL; break; diff --git a/src/fe_utils/astreamer_gzip.c b/src/fe_utils/astreamer_gzip.c index df392f67cab..440af74bd94 100644 --- a/src/fe_utils/astreamer_gzip.c +++ b/src/fe_utils/astreamer_gzip.c @@ -316,8 +316,9 @@ astreamer_gzip_decompressor_content(astreamer *streamer, */ res = inflate(zs, Z_NO_FLUSH); - if (res == Z_STREAM_ERROR) - pg_fatal("could not decompress data: %s", zs->msg); + if (res != Z_OK && res != Z_STREAM_END && res != Z_BUF_ERROR) + pg_fatal("could not decompress data: %s", + zs->msg ? zs->msg : "unknown error"); mystreamer->bytes_written = mystreamer->base.bbs_buffer.maxlen - zs->avail_out; @@ -330,6 +331,10 @@ astreamer_gzip_decompressor_content(astreamer *streamer, mystreamer->base.bbs_buffer.maxlen, context); mystreamer->bytes_written = 0; } + + /* If we've hit the end of the compressed stream, stop. */ + if (res == Z_STREAM_END) + break; } } @@ -362,7 +367,12 @@ astreamer_gzip_decompressor_finalize(astreamer *streamer) static void astreamer_gzip_decompressor_free(astreamer *streamer) { + astreamer_gzip_decompressor *mystreamer; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + astreamer_free(streamer->bbs_next); + inflateEnd(&mystreamer->zstream); pfree(streamer->bbs_buffer.data); pfree(streamer); } diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c index f8be5e4ff8a..3b094fc0328 100644 --- a/src/fe_utils/astreamer_tar.c +++ b/src/fe_utils/astreamer_tar.c @@ -224,7 +224,8 @@ astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, /* OK, now we can send it. */ astreamer_content(mystreamer->base.bbs_next, &mystreamer->member, - data, mystreamer->pad_bytes_expected, + mystreamer->base.bbs_buffer.data, + mystreamer->pad_bytes_expected, ASTREAMER_MEMBER_TRAILER); /* Expect next file header. */ @@ -346,6 +347,7 @@ astreamer_tar_parser_free(astreamer *streamer) { pfree(streamer->bbs_buffer.data); astreamer_free(streamer->bbs_next); + pfree(streamer); } /* -- 2.43.0
