This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 274d704d6b Make automatic flushing of compression configurable
274d704d6b is described below

commit 274d704d6bb918267da846dc97cf43799818cd04
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Nov 9 18:14:11 2025 +0100

    Make automatic flushing of compression configurable
---
 .../pekko/stream/io/compression/CoderSpec.scala    |  3 ++
 .../io/compression/DeflateAutoFlushSpec.scala      | 29 ++++++++++++++
 .../stream/io/compression/GzipAutoFlushSpec.scala  | 29 ++++++++++++++
 .../impl/io/compression/CompressionUtils.scala     | 10 ++++-
 .../apache/pekko/stream/javadsl/Compression.scala  | 39 +++++++++++++++----
 .../apache/pekko/stream/scaladsl/Compression.scala | 45 +++++++++++++++-------
 6 files changed, 132 insertions(+), 23 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
index bed6fef7d9..af7191b0d7 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
@@ -45,6 +45,8 @@ abstract class CoderSpec(codecName: String) extends 
AnyWordSpec with CodecSpecSu
   case object AllDataAllowed extends Exception with NoStackTrace
   protected def corruptInputCheck: Boolean = true
 
+  protected def autoFlush: Boolean = true
+
   def extraTests(): Unit = {}
 
   s"The $codecName codec" should {
@@ -145,6 +147,7 @@ abstract class CoderSpec(codecName: String) extends 
AnyWordSpec with CodecSpecSu
     }
 
     "be able to decode chunk-by-chunk (depending on input chunks)" in {
+      assume(autoFlush)
       val minLength = 100
       val maxLength = 1000
       val numElements = 1000
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
new file mode 100644
index 0000000000..284aaa3ae1
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.io.compression
+
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
+import org.apache.pekko.util.ByteString
+
+import java.util.zip.Deflater
+
+class DeflateAutoFlushSpec extends DeflateSpec {
+  override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
+    Compression.deflate(Deflater.BEST_COMPRESSION, nowrap = false, autoFlush = 
false)
+  override protected val autoFlush: Boolean = false
+}
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
new file mode 100644
index 0000000000..031c19428b
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.io.compression
+
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
+import org.apache.pekko.util.ByteString
+
+import java.util.zip.Deflater
+
+class GzipAutoFlushSpec extends GzipSpec {
+  override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
+    Compression.gzip(Deflater.BEST_COMPRESSION, autoFlush = false)
+  override protected val autoFlush: Boolean = false
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
index 7ce44d962c..d6150f48f3 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
@@ -28,7 +28,8 @@ import pekko.util.ByteString
   /**
    * Creates a flow from a compressor constructor.
    */
-  def compressorFlow(newCompressor: () => Compressor): Flow[ByteString, 
ByteString, NotUsed] =
+  def compressorFlow(newCompressor: () => Compressor, autoFlush: Boolean = 
true)
+      : Flow[ByteString, ByteString, NotUsed] =
     Flow.fromGraph {
       new SimpleLinearGraphStage[ByteString] {
         override def createLogic(inheritedAttributes: Attributes): 
GraphStageLogic =
@@ -36,7 +37,12 @@ import pekko.util.ByteString
             val compressor = newCompressor()
 
             override def onPush(): Unit = {
-              val data = compressor.compressAndFlush(grab(in))
+              val grabbed = grab(in)
+              val data = if (autoFlush)
+                compressor.compressAndFlush(grabbed)
+              else
+                compressor.compress(grabbed)
+
               if (data.nonEmpty) push(out, data)
               else pull(in)
             }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
index b5c4c94a8e..f143528da0 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
@@ -47,10 +47,10 @@ object Compression {
     scaladsl.Compression.inflate(maxBytesPerChunk, nowrap).asJava
 
   /**
-   * Creates a flow that gzip-compresses a stream of ByteStrings. Note that 
the compressor
-   * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is 
guaranteed that every [[pekko.util.ByteString]]
-   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may
-   * come at a compression performance cost for very small chunks.
+   * Creates a flow that gzip-compresses a stream of ByteStrings. Note that 
the compressor will
+   * flush after every single element in stream so that it is guaranteed that 
every [[pekko.util.ByteString]]
+   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may come at
+   * a compression performance cost for very small chunks.
    */
   def gzip: Flow[ByteString, ByteString, NotUsed] =
     scaladsl.Compression.gzip.asJava
@@ -64,10 +64,21 @@ object Compression {
     scaladsl.Compression.gzip(level).asJava
 
   /**
-   * Creates a flow that deflate-compresses a stream of ByteString. Note that 
the compressor
-   * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is 
guaranteed that every [[pekko.util.ByteString]]
-   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may
-   * come at a compression performance cost for very small chunks.
+   * Same as [[gzip]] with a custom level and configurable flush mode.
+   *
+   * @param level Compression level (0-9)
+   * @param autoFlush If true will automatically flush after every single 
element in the stream.
+   *
+   * @since 1.3.0
+   */
+  def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, 
NotUsed] =
+    scaladsl.Compression.gzip(level, autoFlush).asJava
+
+  /**
+   * Creates a flow that deflate-compresses a stream of ByteString. Note that 
the compressor will
+   * flush after every single element in stream so that it is guaranteed that 
every [[pekko.util.ByteString]]
+   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may come at
+   * a compression performance cost for very small chunks.
    */
   def deflate: Flow[ByteString, ByteString, NotUsed] =
     scaladsl.Compression.deflate.asJava
@@ -81,4 +92,16 @@ object Compression {
   def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, 
NotUsed] =
     scaladsl.Compression.deflate(level, nowrap).asJava
 
+  /**
+   * Same as [[deflate]] with configurable level, nowrap and autoFlush.
+   *
+   * @param level Compression level (0-9)
+   * @param nowrap if true then use GZIP compatible compression
+   * @param autoFlush If true will automatically flush after every single 
element in the stream.
+   *
+   * @since 1.3.0
+   */
+  def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): 
Flow[ByteString, ByteString, NotUsed] =
+    scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava
+
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
index c8f889dcd2..ceddb7f70f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
@@ -24,12 +24,10 @@ object Compression {
   final val MaxBytesPerChunkDefault = 64 * 1024
 
   /**
-   * Creates a flow that gzip-compresses a stream of ByteStrings. Note that 
the compressor
-   * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is 
guaranteed that every [[pekko.util.ByteString]]
-   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may
-   * come at a compression performance cost for very small chunks.
-   *
-   * FIXME: should strategy / flush mode be configurable? See 
https://github.com/akka/akka/issues/21849
+   * Creates a flow that gzip-compresses a stream of ByteStrings. Note that 
the compressor will
+   * flush after every single element in stream so that it is guaranteed that 
every [[pekko.util.ByteString]]
+   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may come at
+   * a compression performance cost for very small chunks.
    */
   def gzip: Flow[ByteString, ByteString, NotUsed] = 
gzip(Deflater.BEST_COMPRESSION)
 
@@ -41,6 +39,17 @@ object Compression {
   def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
     CompressionUtils.compressorFlow(() => new GzipCompressor(level))
 
+  /**
+   * Same as [[gzip]] with a custom level and configurable flush mode.
+   *
+   * @param level Compression level (0-9)
+   * @param autoFlush If true will automatically flush after every single 
element in the stream.
+   *
+   * @since 1.3.0
+   */
+  def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, 
NotUsed] =
+    CompressionUtils.compressorFlow(() => new GzipCompressor(level), autoFlush)
+
   /**
    * Creates a Flow that decompresses a gzip-compressed stream of data.
    *
@@ -51,14 +60,12 @@ object Compression {
     Flow[ByteString].via(new 
GzipDecompressor(maxBytesPerChunk)).named("gzipDecompress")
 
   /**
-   * Creates a flow that deflate-compresses a stream of ByteString. Note that 
the compressor
-   * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is 
guaranteed that every [[pekko.util.ByteString]]
-   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may
-   * come at a compression performance cost for very small chunks.
-   *
-   * FIXME: should strategy / flush mode be configurable? See 
https://github.com/akka/akka/issues/21849
+   * Creates a flow that deflate-compresses a stream of ByteString. Note that 
the compressor will
+   * flush after every single element in stream so that it is guaranteed that 
every [[pekko.util.ByteString]]
+   * coming out of the flow can be fully decompressed without waiting for 
additional data. This may come at
+   * a compression performance cost for very small chunks.
    */
-  def deflate: Flow[ByteString, ByteString, NotUsed] = 
deflate(Deflater.BEST_COMPRESSION, false)
+  def deflate: Flow[ByteString, ByteString, NotUsed] = 
deflate(Deflater.BEST_COMPRESSION, nowrap = false)
 
   /**
    * Same as [[deflate]] with configurable level and nowrap
@@ -69,6 +76,18 @@ object Compression {
   def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, 
NotUsed] =
     CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap))
 
+  /**
+   * Same as [[deflate]] with configurable level, nowrap and autoFlush.
+   *
+   * @param level Compression level (0-9)
+   * @param nowrap if true then use GZIP compatible compression
+   * @param autoFlush If true will automatically flush after every single 
element in the stream.
+   *
+   * @since 1.3.0
+   */
+  def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): 
Flow[ByteString, ByteString, NotUsed] =
+    CompressionUtils.compressorFlow(() => new DeflateCompressor(level, 
nowrap), autoFlush)
+
   /**
    * Creates a Flow that decompresses a deflate-compressed stream of data.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to