This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/1.1.x by this push:
     new 32f6066bf Make GCS resumableUpload work with empty byte payload
32f6066bf is described below

commit 32f6066bfa2989a50ccd3851c502db762d7ad70d
Author: Matthew de Detrich <[email protected]>
AuthorDate: Mon May 5 12:20:36 2025 +0200

    Make GCS resumableUpload work with empty byte payload
    
    (cherry picked from commit 0cc560d725c41fb73f7225ad74311d920246d5fd)
---
 .../impl/GCStorageStreamIntegrationSpec.scala      | 28 ++++++++++++++++---
 .../private-zero-chunk-change.backwards.excludes   |  1 +
 .../stream/connectors/google/ResumableUpload.scala | 18 ++++++++++---
 .../connectors/google/util/AnnotateLast.scala      | 31 +++++++++++++++++-----
 .../connectors/google/util/AnnotateLastSpec.scala  | 12 +++++++++
 5 files changed, 77 insertions(+), 13 deletions(-)

diff --git 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
index 25739460d..cf3337dd0 100644
--- 
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
+++ 
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
@@ -226,7 +226,7 @@ trait GCStorageStreamIntegrationSpec
     "get a single empty ByteString when downloading a non existing file" in {
       val fileName = testFileName("non-existing-file")
       val res = for {
-        _ <- GCStorageStream
+        storageObject <- GCStorageStream
           .putObject(bucket, fileName, Source.single(ByteString.empty), 
ContentTypes.`text/plain(UTF-8)`)
           .withAttributes(finalAttributes)
           .runWith(Sink.head)
@@ -236,8 +236,30 @@ trait GCStorageStreamIntegrationSpec
           .runWith(Sink.head)
           .flatMap(
             _.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _ 
})).getOrElse(Future.successful(ByteString.empty)))
-      } yield res
-      res.futureValue shouldBe ByteString.empty
+      } yield (storageObject, res)
+      val (storageObjet, bytes) = res.futureValue
+      bytes shouldBe ByteString.empty
+      storageObjet.size shouldBe 0
+      storageObjet.md5Hash shouldBe "1B2M2Y8AsgTpgAmY7PhCfg=="
+    }
+
+    "get a single empty ByteString using resumableUpload when downloading a 
non existing file" in {
+      val fileName = testFileName("non-existing-file")
+      val res = for {
+        storageObject <- Source.single(ByteString.empty)
+          .withAttributes(finalAttributes)
+          .runWith(GCStorageStream.resumableUpload(bucket, fileName, 
ContentTypes.`text/plain(UTF-8)`))
+        res <- GCStorageStream
+          .download(bucket, fileName)
+          .withAttributes(finalAttributes)
+          .runWith(Sink.head)
+          .flatMap(
+            _.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _ 
})).getOrElse(Future.successful(ByteString.empty)))
+      } yield (storageObject, res)
+      val (storageObjet, bytes) = res.futureValue
+      bytes shouldBe ByteString.empty
+      storageObjet.size shouldBe 0
+      storageObjet.md5Hash shouldBe "1B2M2Y8AsgTpgAmY7PhCfg=="
     }
 
     "delete an existing file" in {
diff --git 
a/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
 
b/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
new file mode 100644
index 000000000..8dc2f205f
--- /dev/null
+++ 
b/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
@@ -0,0 +1 @@
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.stream.connectors.google.ResumableUpload$Chunk$")
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
index 4981f60dd..ac81f034d 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
@@ -17,11 +17,12 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.annotation.InternalApi
 import pekko.dispatch.ExecutionContexts
+import pekko.http.scaladsl.model.ContentRange.Unsatisfiable
 import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT }
 import pekko.http.scaladsl.model.StatusCodes.{ Created, OK, PermanentRedirect }
 import pekko.http.scaladsl.model._
 import pekko.http.scaladsl.model.headers.ByteRange.Slice
-import pekko.http.scaladsl.model.headers.{ `Content-Range`, Location, Range, 
RawHeader }
+import pekko.http.scaladsl.model.headers.{ `Content-Range`, Location, Range, 
RangeUnits, RawHeader }
 import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, 
Unmarshal, Unmarshaller }
 import pekko.stream.Materializer
 import pekko.stream.connectors.google.http.GoogleHttp
@@ -40,6 +41,10 @@ private[connectors] object ResumableUpload {
   final case class UploadFailedException() extends Exception
   private final case class Chunk(bytes: ByteString, position: Long)
 
+  private object Chunk {
+    final val zero = Chunk(ByteString.empty, 0)
+  }
+
   /**
    * Initializes and runs a resumable upload to a media endpoint.
    *
@@ -65,7 +70,7 @@ private[connectors] object ResumableUpload {
           .statefulMap(() => 0L)((cumulativeLength, bytes) =>
               (cumulativeLength + bytes.length, Chunk(bytes, 
cumulativeLength)),
             _ => None)
-          .via(AnnotateLast[Chunk])
+          .via(AnnotateLast[Chunk](Chunk.zero))
           .map(chunk => Future.successful(Right(chunk)))
 
         val upload = Flow
@@ -136,8 +141,13 @@ private[connectors] object ResumableUpload {
       Flow[T].map(t => Success(Some(t))),
       Flow[MaybeLast[Chunk]].map {
         case maybeLast @ MaybeLast(Chunk(bytes, position)) =>
-          val totalLength = if (maybeLast.isLast) Some(position + 
bytes.length) else None
-          val header = `Content-Range`(ContentRange(position, position + 
bytes.length - 1, totalLength))
+          val header = if (bytes.isEmpty && position == 0) {
+            // This branch gets hit when detecting a special sentinel value 
that gets inserted in the case of an empty
+            // ByteString upload.
+            `Content-Range`(RangeUnits.Bytes, Unsatisfiable(0))
+          } else
+            `Content-Range`(ContentRange(position, position + bytes.length - 1,
+              if (maybeLast.isLast) Some(position + bytes.length) else None))
           request.addHeader(header).withEntity(bytes)
       }.via(pool)).map(_.merge).mapMaterializedValue(_ => NotUsed)
   }
diff --git 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
index 9af3982e6..f8a0c0de8 100644
--- 
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
+++ 
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
@@ -45,14 +45,33 @@ private[google] final case class Last[+T](value: T) extends 
MaybeLast[T] {
 @InternalApi
 private[google] object AnnotateLast {
 
+  private final def accumulateState[T](maybePreviousElement: Option[T], elem: 
T) =
+    maybePreviousElement match {
+      case Some(previousElem) => (Some(elem), Some(NotLast(previousElem)))
+      case None               => (Some(elem), None)
+    }
+
   def apply[T]: Flow[T, MaybeLast[T], NotUsed] =
     Flow[T]
-      .statefulMap(() => Option.empty[T])((maybePreviousElement, elem) => {
-          maybePreviousElement match {
-            case Some(previousElem) => (Some(elem), 
Some(NotLast(previousElem)))
-            case None               => (Some(elem), None)
-          }
-        }, _.map(elem => Some(Last(elem))))
+      .statefulMap(() => Option.empty[T])(
+        accumulateState,
+        _.map(elem => Some(Last(elem)))
+      ).collect {
+        case Some(elem) => elem
+      }
+
+  def apply[T](zero: T): Flow[T, MaybeLast[T], NotUsed] =
+    Flow[T]
+      .statefulMap(() => Option.empty[T])(
+        accumulateState,
+        {
+          case Some(elem) =>
+            Some(Some(Last(elem)))
+          case None =>
+            // This case gets hit on an empty stream, we insert a special 
sentinel value which can
+            // be detected later.
+            Some(Some(Last(zero)))
+        })
       .collect {
         case Some(elem) => elem
       }
diff --git 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
index b75e795e0..d6653269c 100644
--- 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
+++ 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
@@ -50,6 +50,18 @@ class AnnotateLastSpec
       val probe = 
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink.probe)
       probe.expectSubscriptionAndComplete()
     }
+
+    "return zero value when stream is empty using zero apply" in {
+      val probe = 
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink.probe)
+      probe.requestNext(Last(null))
+      probe.expectComplete()
+    }
+
+    "don't return zero value if stream is non empty using zero apply" in {
+      val probe = 
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink.probe)
+      probe.requestNext(Last(1))
+      probe.expectComplete()
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to