This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch 1.1.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/1.1.x by this push:
new 32f6066bf Make GCS resumableUpload work with empty byte payload
32f6066bf is described below
commit 32f6066bfa2989a50ccd3851c502db762d7ad70d
Author: Matthew de Detrich <[email protected]>
AuthorDate: Mon May 5 12:20:36 2025 +0200
Make GCS resumableUpload work with empty byte payload
(cherry picked from commit 0cc560d725c41fb73f7225ad74311d920246d5fd)
---
.../impl/GCStorageStreamIntegrationSpec.scala | 28 ++++++++++++++++---
.../private-zero-chunk-change.backwards.excludes | 1 +
.../stream/connectors/google/ResumableUpload.scala | 18 ++++++++++---
.../connectors/google/util/AnnotateLast.scala | 31 +++++++++++++++++-----
.../connectors/google/util/AnnotateLastSpec.scala | 12 +++++++++
5 files changed, 77 insertions(+), 13 deletions(-)
diff --git
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
index 25739460d..cf3337dd0 100644
---
a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
+++
b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala
@@ -226,7 +226,7 @@ trait GCStorageStreamIntegrationSpec
"get a single empty ByteString when downloading a non existing file" in {
val fileName = testFileName("non-existing-file")
val res = for {
- _ <- GCStorageStream
+ storageObject <- GCStorageStream
.putObject(bucket, fileName, Source.single(ByteString.empty),
ContentTypes.`text/plain(UTF-8)`)
.withAttributes(finalAttributes)
.runWith(Sink.head)
@@ -236,8 +236,30 @@ trait GCStorageStreamIntegrationSpec
.runWith(Sink.head)
.flatMap(
_.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _
})).getOrElse(Future.successful(ByteString.empty)))
- } yield res
- res.futureValue shouldBe ByteString.empty
+ } yield (storageObject, res)
+ val (storageObjet, bytes) = res.futureValue
+ bytes shouldBe ByteString.empty
+ storageObjet.size shouldBe 0
+ storageObjet.md5Hash shouldBe "1B2M2Y8AsgTpgAmY7PhCfg=="
+ }
+
+ "get a single empty ByteString using resumableUpload when downloading a
non existing file" in {
+ val fileName = testFileName("non-existing-file")
+ val res = for {
+ storageObject <- Source.single(ByteString.empty)
+ .withAttributes(finalAttributes)
+ .runWith(GCStorageStream.resumableUpload(bucket, fileName,
ContentTypes.`text/plain(UTF-8)`))
+ res <- GCStorageStream
+ .download(bucket, fileName)
+ .withAttributes(finalAttributes)
+ .runWith(Sink.head)
+ .flatMap(
+ _.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _
})).getOrElse(Future.successful(ByteString.empty)))
+ } yield (storageObject, res)
+ val (storageObjet, bytes) = res.futureValue
+ bytes shouldBe ByteString.empty
+ storageObjet.size shouldBe 0
+ storageObjet.md5Hash shouldBe "1B2M2Y8AsgTpgAmY7PhCfg=="
}
"delete an existing file" in {
diff --git
a/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
b/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
new file mode 100644
index 000000000..8dc2f205f
--- /dev/null
+++
b/google-common/src/main/mima-filters/1.1.x.backwards.excludes/private-zero-chunk-change.backwards.excludes
@@ -0,0 +1 @@
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.stream.connectors.google.ResumableUpload$Chunk$")
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
index 4981f60dd..ac81f034d 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala
@@ -17,11 +17,12 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
+import pekko.http.scaladsl.model.ContentRange.Unsatisfiable
import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT }
import pekko.http.scaladsl.model.StatusCodes.{ Created, OK, PermanentRedirect }
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.ByteRange.Slice
-import pekko.http.scaladsl.model.headers.{ `Content-Range`, Location, Range,
RawHeader }
+import pekko.http.scaladsl.model.headers.{ `Content-Range`, Location, Range,
RangeUnits, RawHeader }
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller,
Unmarshal, Unmarshaller }
import pekko.stream.Materializer
import pekko.stream.connectors.google.http.GoogleHttp
@@ -40,6 +41,10 @@ private[connectors] object ResumableUpload {
final case class UploadFailedException() extends Exception
private final case class Chunk(bytes: ByteString, position: Long)
+ private object Chunk {
+ final val zero = Chunk(ByteString.empty, 0)
+ }
+
/**
* Initializes and runs a resumable upload to a media endpoint.
*
@@ -65,7 +70,7 @@ private[connectors] object ResumableUpload {
.statefulMap(() => 0L)((cumulativeLength, bytes) =>
(cumulativeLength + bytes.length, Chunk(bytes,
cumulativeLength)),
_ => None)
- .via(AnnotateLast[Chunk])
+ .via(AnnotateLast[Chunk](Chunk.zero))
.map(chunk => Future.successful(Right(chunk)))
val upload = Flow
@@ -136,8 +141,13 @@ private[connectors] object ResumableUpload {
Flow[T].map(t => Success(Some(t))),
Flow[MaybeLast[Chunk]].map {
case maybeLast @ MaybeLast(Chunk(bytes, position)) =>
- val totalLength = if (maybeLast.isLast) Some(position +
bytes.length) else None
- val header = `Content-Range`(ContentRange(position, position +
bytes.length - 1, totalLength))
+ val header = if (bytes.isEmpty && position == 0) {
+ // This branch gets hit when detecting a special sentinel value
that gets inserted in the case of an empty
+ // ByteString upload.
+ `Content-Range`(RangeUnits.Bytes, Unsatisfiable(0))
+ } else
+ `Content-Range`(ContentRange(position, position + bytes.length - 1,
+ if (maybeLast.isLast) Some(position + bytes.length) else None))
request.addHeader(header).withEntity(bytes)
}.via(pool)).map(_.merge).mapMaterializedValue(_ => NotUsed)
}
diff --git
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
index 9af3982e6..f8a0c0de8 100644
---
a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
+++
b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLast.scala
@@ -45,14 +45,33 @@ private[google] final case class Last[+T](value: T) extends
MaybeLast[T] {
@InternalApi
private[google] object AnnotateLast {
+ private final def accumulateState[T](maybePreviousElement: Option[T], elem:
T) =
+ maybePreviousElement match {
+ case Some(previousElem) => (Some(elem), Some(NotLast(previousElem)))
+ case None => (Some(elem), None)
+ }
+
def apply[T]: Flow[T, MaybeLast[T], NotUsed] =
Flow[T]
- .statefulMap(() => Option.empty[T])((maybePreviousElement, elem) => {
- maybePreviousElement match {
- case Some(previousElem) => (Some(elem),
Some(NotLast(previousElem)))
- case None => (Some(elem), None)
- }
- }, _.map(elem => Some(Last(elem))))
+ .statefulMap(() => Option.empty[T])(
+ accumulateState,
+ _.map(elem => Some(Last(elem)))
+ ).collect {
+ case Some(elem) => elem
+ }
+
+ def apply[T](zero: T): Flow[T, MaybeLast[T], NotUsed] =
+ Flow[T]
+ .statefulMap(() => Option.empty[T])(
+ accumulateState,
+ {
+ case Some(elem) =>
+ Some(Some(Last(elem)))
+ case None =>
+ // This case gets hit on an empty stream, we insert a special
sentinel value which can
+ // be detected later.
+ Some(Some(Last(zero)))
+ })
.collect {
case Some(elem) => elem
}
diff --git
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
index b75e795e0..d6653269c 100644
---
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
+++
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
@@ -50,6 +50,18 @@ class AnnotateLastSpec
val probe =
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink.probe)
probe.expectSubscriptionAndComplete()
}
+
+ "return zero value when stream is empty using zero apply" in {
+ val probe =
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink.probe)
+ probe.requestNext(Last(null))
+ probe.expectComplete()
+ }
+
+ "don't return zero value if stream is non empty using zero apply" in {
+ val probe =
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink.probe)
+ probe.requestNext(Last(1))
+ probe.expectComplete()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]