This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c60381d5 fix(file): In TarReader avoid prematurely completing source 
(#1442)
7c60381d5 is described below

commit 7c60381d5cda88a925cece3b2ff9e48bbf989e74
Author: Gaël Bréard <[email protected]>
AuthorDate: Mon Mar 2 18:36:55 2026 +0100

    fix(file): In TarReader avoid prematurely completing source (#1442)
    
    * fix(file): In TarReader avoid prematurely completing source when 
processing multi-file TAR archives
    
    fix #1407
    
    * test(file): In TarReader avoid prematurely completing source when 
processing multi-file TAR archives
    
    fix #1407
    
    * run sbt scalafmtAll
---
 .../file/impl/archive/TarReaderStage.scala         |  8 +++-
 .../test/scala/docs/scaladsl/TarArchiveSpec.scala  | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
index b6239f3e9..33a0cb789 100644
--- 
a/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
+++ 
b/file/src/main/scala/org/apache/pekko/stream/connectors/file/impl/archive/TarReaderStage.scala
@@ -101,8 +101,12 @@ private[file] class TarReaderStage
         val trailerLength = TarArchiveEntry.trailerLength(metadata)
         if (buffer.length >= trailerLength) {
           subSource.foreach(_.complete())
-          if (isClosed(flowIn)) completeStage()
-          readHeader(buffer.drop(trailerLength))
+          val remainingBuffer = buffer.drop(trailerLength)
+          if (remainingBuffer.isEmpty && isClosed(flowIn)) {
+            completeStage() // Safe: no more data to process
+          } else {
+            readHeader(remainingBuffer) // Continue processing
+          }
         } else setHandlers(flowIn, flowOut, new ReadPastTrailer(metadata, 
buffer, subSource))
       }
 
diff --git a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala 
b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
index ad7035674..da731fec8 100644
--- a/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/TarArchiveSpec.scala
@@ -327,6 +327,55 @@ class TarArchiveSpec
       tar.futureValue shouldBe empty
     }
 
+    "extract all files when upstream closes with buffered entries (bug 1407)" 
in {
+      // Bug 1407: TarReaderStage was completing prematurely when the upstream 
closed
+      // while the buffer still contained unprocessed file headers/data.
+      // This test simulates the scenario where:
+      // 1. A multi-file TAR archive is streamed as a single ByteString
+      // 2. The upstream closes immediately after sending all data
+      // 3. The buffer contains multiple file entries to process
+
+      val file1Content = ByteString("content1")
+      val file2Content = ByteString("content2")
+      val file3Content = ByteString("content3")
+
+      val metadata1 = TarArchiveMetadata("file1.txt", file1Content.length)
+      val metadata2 = TarArchiveMetadata("file2.txt", file2Content.length)
+      val metadata3 = TarArchiveMetadata("file3.txt", file3Content.length)
+
+      // Create a TAR archive with 3 files, emitted as a single ByteString
+      // This simulates an S3 stream that closes after sending all data at once
+      val multiFileArchive = Source(
+        immutable.Seq(
+          metadata1 -> Source.single(file1Content),
+          metadata2 -> Source.single(file2Content),
+          metadata3 -> Source.single(file3Content)
+        ))
+        .via(Archive.tar())
+        .runWith(collectByteString)
+
+      // Process the archive - the upstream will close after the single 
ByteString
+      // Without the fix, only the first file(s) would be extracted
+      val tar = Source
+        .future(multiFileArchive)
+        .via(Archive.tarReader())
+        .mapAsync(1) {
+          case (metadata, source) =>
+            source.runWith(collectByteString).map { bs =>
+              metadata -> bs
+            }
+        }
+        .runWith(Sink.seq)
+
+      val result = tar.futureValue
+
+      // All 3 files should be extracted, not just the first one
+      (result should have).length(3)
+      result(0) shouldBe metadata1 -> file1Content
+      result(1) shouldBe metadata2 -> file2Content
+      result(2) shouldBe metadata3 -> file3Content
+    }
+
     "fail on missing sub source subscription" in {
       val tar =
         Source


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to