This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new fde163d63 Google Cloud Pub/Sub gRPC: subscribe() must not echo 
initial-only StreamingPullRequest fields on keepalive ticks (#1625)
fde163d63 is described below

commit fde163d634dec7275979d619423e6f769b725f63
Author: Haruhiko Nishi <[email protected]>
AuthorDate: Wed May 13 19:54:44 2026 +0900

    Google Cloud Pub/Sub gRPC: subscribe() must not echo initial-only 
StreamingPullRequest fields on keepalive ticks (#1625)
    
    * Google Cloud Pub/Sub gRPC: subscribe() must not echo initial-only 
StreamingPullRequest fields on keepalive ticks
    
    * `subscribe(...)` was clearing only `subscription` and 
`streamAckDeadlineSeconds` on the keepalive request, leaving `clientId`, 
`maxOutstandingMessages`, and `maxOutstandingBytes` from the initial one. 
Pub/Sub rejects those three on subsequent requests with `INVALID_ARGUMENT`, so 
any caller setting `maxOutstandingMessages` loses the whole stream about a 
second after start. Fix is one line on each DSL: use 
`StreamingPullRequest.defaultInstance` (Scala) / `getDefaultInstance` (Java) fo 
[...]
    
    * Google Cloud Pub/Sub gRPC: replace Thread.sleep with eventually in the 
keepalive-tick test
    
    `Thread.sleep(500)` replaced with `eventually(timeout(5.s), 
interval(50.ms))` polling the captured queue.
    
    * Google Cloud Pub/Sub gRPC: use top-level imports in the test stub
    
    Dropped `scaladsl.`/`pekko.NotUsed` prefixes.
---
 .../pubsub/grpc/javadsl/GooglePubSub.scala         | 11 ++-
 .../pubsub/grpc/scaladsl/GooglePubSub.scala        | 13 +--
 .../pubsub/grpc/AutoExtendAckDeadlinesSpec.scala   | 95 ++++++++++++++++++++--
 3 files changed, 101 insertions(+), 18 deletions(-)

diff --git 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
index edf0f0af8..1e717a69f 100644
--- 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
+++ 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala
@@ -66,10 +66,13 @@ object GooglePubSub {
       .fromMaterializer { (mat, attr) =>
         val cancellable = new CompletableFuture[Cancellable]()
 
-        val subsequentRequest = request.toBuilder
-          .setSubscription("")
-          .setStreamAckDeadlineSeconds(0)
-          .build()
+        // Don't echo initial-only fields on keepalive requests. Pub/Sub 
allows only
+        // ackIds, modifyDeadlineSeconds, and modifyDeadlineAckIds on 
subsequent
+        // StreamingPullRequests; anything else from the initial request 
(subscription,
+        // streamAckDeadlineSeconds, clientId, maxOutstandingMessages, 
maxOutstandingBytes)
+        // gets back INVALID_ARGUMENT. defaultInstance clears them all at once 
and stays
+        // safe if the proto grows more initial-only fields.
+        val subsequentRequest = StreamingPullRequest.getDefaultInstance
 
         subscriber(mat, attr).client
           .streamingPull(
diff --git 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
index d0746242d..4408f8503 100644
--- 
a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
+++ 
b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GooglePubSub.scala
@@ -66,9 +66,13 @@ object GooglePubSub {
       .fromMaterializer { (mat, attr) =>
         val cancellable = Promise[Cancellable]()
 
-        val subsequentRequest = request
-          .withSubscription("")
-          .withStreamAckDeadlineSeconds(0)
+        // Don't echo initial-only fields on keepalive requests. Pub/Sub 
allows only
+        // ackIds, modifyDeadlineSeconds, and modifyDeadlineAckIds on 
subsequent
+        // StreamingPullRequests; anything else from the initial request 
(subscription,
+        // streamAckDeadlineSeconds, clientId, maxOutstandingMessages, 
maxOutstandingBytes)
+        // gets back INVALID_ARGUMENT. defaultInstance clears them all at once 
and stays
+        // safe if the proto grows more initial-only fields.
+        val subsequentRequest = StreamingPullRequest.defaultInstance
 
         subscriber(mat, attr).client
           .streamingPull(
@@ -76,8 +80,7 @@ object GooglePubSub {
               .single(request)
               .concat(
                 Source
-                  .tick(0.seconds, pollInterval, ())
-                  .map(_ => subsequentRequest)
+                  .tick(0.seconds, pollInterval, subsequentRequest)
                   .mapMaterializedValue(cancellable.success)))
           .mapConcat(_.receivedMessages.toVector)
           .mapMaterializedValue(_ => cancellable.future)
diff --git 
a/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
 
b/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
index 1038927df..a4e0eb565 100644
--- 
a/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
+++ 
b/google-cloud-pub-sub-grpc/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/AutoExtendAckDeadlinesSpec.scala
@@ -18,26 +18,30 @@
 package org.apache.pekko.stream.connectors.googlecloud.pubsub.grpc
 
 import org.apache.pekko
-import pekko.Done
+import pekko.{ Done, NotUsed }
 import pekko.actor.ActorSystem
-import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
 import pekko.stream.connectors.googlecloud.pubsub.grpc.scaladsl.{ 
GooglePubSub, GrpcSubscriber, PubSubAttributes }
 import com.google.protobuf.ByteString
 import com.google.pubsub.v1.pubsub._
+import org.apache.pekko.stream._
 
+import java.util.concurrent.ConcurrentLinkedQueue
 import scala.concurrent.{ Future, Promise }
 import scala.concurrent.duration._
-
+import scala.jdk.CollectionConverters._
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.concurrent.{ Eventually, ScalaFutures }
 import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{ Millis, Seconds, Span }
 import org.scalatest.wordspec.AnyWordSpec
 
 class AutoExtendAckDeadlinesSpec
     extends AnyWordSpec
     with Matchers
     with BeforeAndAfterAll
-    with ScalaFutures {
+    with ScalaFutures
+    with Eventually {
 
   implicit val system: ActorSystem = ActorSystem("AutoExtendAckDeadlinesSpec")
   implicit val patience: PatienceConfig = PatienceConfig(10.seconds, 
100.millis)
@@ -156,17 +160,90 @@ class AutoExtendAckDeadlinesSpec
     }
   }
 
+  "GooglePubSub.subscribe" should {
+    "send only allowed fields on subsequent StreamingPullRequest messages" in {
+      // Regression test: the keepalive tick must NOT echo initial-only fields
+      // (clientId, maxOutstandingMessages, maxOutstandingBytes) back to the 
server,
+      // which would otherwise return INVALID_ARGUMENT on the first tick.
+      val captured = new ConcurrentLinkedQueue[StreamingPullRequest]()
+      val testSubscriber = new GrpcSubscriber(new 
CapturingStreamingPullClient(captured)(system))
+
+      val initial = StreamingPullRequest()
+        .withSubscription(subscription)
+        .withStreamAckDeadlineSeconds(60)
+        .withClientId("test-client")
+        .withMaxOutstandingMessages(100L)
+        .withMaxOutstandingBytes(10485760L)
+
+      val cancellableFut = GooglePubSub
+        .subscribe(initial, 100.millis)
+        .withAttributes(PubSubAttributes.subscriber(testSubscriber))
+        .toMat(Sink.ignore)(Keep.left)
+        .run()
+
+      // Wait until the initial request plus at least 2 keepalive ticks have 
landed,
+      // then cancel.
+      eventually(timeout(Span(5, Seconds)), interval(Span(50, Millis))) {
+        captured.size should be >= 3
+      }
+      cancellableFut.futureValue.cancel()
+
+      val first = captured.poll()
+      first should not be null
+      withClue("initial request must carry caller-supplied fields verbatim: ") 
{
+        first.subscription shouldBe subscription
+        first.streamAckDeadlineSeconds shouldBe 60
+        first.clientId shouldBe "test-client"
+        first.maxOutstandingMessages shouldBe 100L
+        first.maxOutstandingBytes shouldBe 10485760L
+      }
+
+      val subsequent = Iterator
+        .continually(Option(captured.poll()))
+        .takeWhile(_.isDefined)
+        .flatten
+        .toList
+      subsequent should not be empty
+      subsequent.zipWithIndex.foreach { case (req, idx) =>
+        withClue(s"subsequent request #${idx + 1} must be the default 
instance: ") {
+          req.subscription shouldBe ""
+          req.streamAckDeadlineSeconds shouldBe 0
+          req.clientId shouldBe ""
+          req.maxOutstandingMessages shouldBe 0L
+          req.maxOutstandingBytes shouldBe 0L
+        }
+      }
+    }
+  }
+
   override def afterAll(): Unit =
     system.terminate()
 }
 
+/**
+ * Stub that captures every StreamingPullRequest sent on the client → server 
stream
+ * and keeps the response stream open indefinitely (so the polling tick keeps 
firing).
+ */
+class CapturingStreamingPullClient(captured: 
ConcurrentLinkedQueue[StreamingPullRequest])(
+    implicit sys: ActorSystem) extends TestSubscriberClientBase {
+  private implicit val mat: Materializer = Materializer.matFromSystem(sys)
+
+  override def streamingPull(
+      in: Source[StreamingPullRequest, NotUsed])
+      : Source[StreamingPullResponse, NotUsed] =
+    Source
+      .maybe[StreamingPullResponse]
+      .mapMaterializedValue { _ =>
+        in.runForeach(req => captured.add(req))
+        NotUsed
+      }
+}
+
 /**
  * Base stub for [[SubscriberClient]] that provides default implementations
  * for all methods. Tests override only the methods they need.
  */
 trait TestSubscriberClientBase extends SubscriberClient {
-  import pekko.NotUsed
-
   override def createSubscription(in: Subscription): Future[Subscription] = ???
   override def getSubscription(in: GetSubscriptionRequest): 
Future[Subscription] = ???
   override def updateSubscription(in: UpdateSubscriptionRequest): 
Future[Subscription] = ???
@@ -176,8 +253,8 @@ trait TestSubscriberClientBase extends SubscriberClient {
   override def acknowledge(in: AcknowledgeRequest): 
Future[com.google.protobuf.empty.Empty] = ???
   override def pull(in: PullRequest): Future[PullResponse] = ???
   override def streamingPull(
-      in: pekko.stream.scaladsl.Source[StreamingPullRequest, NotUsed])
-      : pekko.stream.scaladsl.Source[StreamingPullResponse, NotUsed] = ???
+      in: Source[StreamingPullRequest, NotUsed])
+      : Source[StreamingPullResponse, NotUsed] = ???
   override def modifyPushConfig(in: ModifyPushConfigRequest): 
Future[com.google.protobuf.empty.Empty] = ???
   override def getSnapshot(in: GetSnapshotRequest): Future[Snapshot] = ???
   override def listSnapshots(in: ListSnapshotsRequest): 
Future[ListSnapshotsResponse] = ???


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to