This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-grpc.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a954161 Send transitive dependencies in server reflection (#643)
7a954161 is described below

commit 7a9541616c15b064e7cb1d9729584b3a332a9031
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 2 14:05:00 2026 +0100

    Send transitive dependencies in server reflection (#643)
    
    * Initial plan
    
    * Copy akka-grpc PR #1719: server reflection transitive dependencies
    
    Co-authored-by: pjfanning <[email protected]>
    
    * change imports
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 build.sbt                                          |  3 +-
 .../pekko/grpc/internal/ServerReflectionImpl.scala | 93 +++++++++++++++-------
 .../pekko/grpc/internal/reflection_test_1.proto    | 11 +++
 .../pekko/grpc/internal/reflection_test_2.proto    |  9 +++
 .../pekko/grpc/internal/reflection_test_3.proto    |  9 +++
 .../pekko/grpc/internal/reflection_test_4.proto    |  6 ++
 .../grpc/internal/ServerReflectionImplSpec.scala   | 91 ++++++++++++++++++---
 7 files changed, 183 insertions(+), 39 deletions(-)

diff --git a/build.sbt b/build.sbt
index 51bc6547..0155c112 100644
--- a/build.sbt
+++ b/build.sbt
@@ -106,7 +106,8 @@ lazy val runtime = Project(id = "runtime", base = 
file("runtime"))
     AutomaticModuleName.settings("pekko.grpc.runtime"),
     ReflectiveCodeGen.generatedLanguages := Seq("Scala"),
     ReflectiveCodeGen.extraGenerators := Seq("ScalaMarshallersCodeGenerator"),
-    PB.protocVersion := Dependencies.Versions.googleProtoc)
+    PB.protocVersion := Dependencies.Versions.googleProtoc,
+    Test / PB.targets += (scalapb.gen() -> (Test / sourceManaged).value))
   .enablePlugins(org.apache.pekko.grpc.build.ReflectiveCodeGen)
   .enablePlugins(ReproducibleBuildsPlugin)
 
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/ServerReflectionImpl.scala
 
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/ServerReflectionImpl.scala
index 198f2ded..c687c09d 100644
--- 
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/ServerReflectionImpl.scala
+++ 
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/ServerReflectionImpl.scala
@@ -32,40 +32,77 @@ import scala.jdk.CollectionConverters._
 final class ServerReflectionImpl private (fileDescriptors: Map[String, 
FileDescriptor], services: List[String])
     extends ServerReflection {
   import ServerReflectionImpl._
+  import ServerReflectionResponse.{ MessageResponse => Out }
 
   private val protoBytesLocalCache: concurrent.Map[String, ByteString] =
     new ConcurrentHashMap[String, ByteString]().asScala
 
   def serverReflectionInfo(in: Source[ServerReflectionRequest, NotUsed]): 
Source[ServerReflectionResponse, NotUsed] = {
-    in.map(req => {
-      import ServerReflectionRequest.{ MessageRequest => In }
-      import ServerReflectionResponse.{ MessageResponse => Out }
-
-      val response = req.messageRequest match {
-        case In.Empty =>
-          Out.Empty
-        case In.FileByFilename(fileName) =>
-          val list = fileDescriptors.get(fileName).map(getProtoBytes).toList
-          Out.FileDescriptorResponse(FileDescriptorResponse(list))
-        case In.FileContainingSymbol(symbol) =>
-          val list = findFileDescForSymbol(symbol, 
fileDescriptors).map(getProtoBytes).toList
-          Out.FileDescriptorResponse(FileDescriptorResponse(list))
-        case In.FileContainingExtension(ExtensionRequest(container, number, 
_)) =>
-          val list = findFileDescForExtension(container, number, 
fileDescriptors).map(getProtoBytes).toList
-          Out.FileDescriptorResponse(FileDescriptorResponse(list))
-        case In.AllExtensionNumbersOfType(container) =>
-          val list =
-            findExtensionNumbersForContainingType(
-              container,
-              fileDescriptors) // TODO should we throw a NOT_FOUND if we don't 
know the container type at all?
-          Out.AllExtensionNumbersResponse(ExtensionNumberResponse(container, 
list))
-        case In.ListServices(_) =>
-          val list = services.map(s => ServiceResponse(s))
-          Out.ListServicesResponse(ListServiceResponse(list))
+    // The server reflection spec requires sending transitive dependencies, 
but allows (and encourages) to only send
+    // transitive dependencies that haven't yet been sent on this stream. So, 
we track this with a stateful map.
+    in.statefulMap(() => Set.empty[String])(
+      (alreadySent, req) => {
+
+        import ServerReflectionRequest.{ MessageRequest => In }
+
+        val (newAlreadySent, response) = req.messageRequest match {
+          case In.Empty =>
+            (alreadySent, Out.Empty)
+          case In.FileByFilename(fileName) =>
+            toFileDescriptorResponse(fileDescriptors.get(fileName), 
alreadySent)
+          case In.FileContainingSymbol(symbol) =>
+            toFileDescriptorResponse(findFileDescForSymbol(symbol, 
fileDescriptors), alreadySent)
+          case In.FileContainingExtension(ExtensionRequest(container, number, 
_)) =>
+            toFileDescriptorResponse(findFileDescForExtension(container, 
number, fileDescriptors), alreadySent)
+          case In.AllExtensionNumbersOfType(container) =>
+            val list =
+              findExtensionNumbersForContainingType(
+                container,
+                fileDescriptors) // TODO should we throw a NOT_FOUND if we 
don't know the container type at all?
+            (alreadySent, 
Out.AllExtensionNumbersResponse(ExtensionNumberResponse(container, list)))
+          case In.ListServices(_) =>
+            val list = services.map(s => ServiceResponse(s))
+            (alreadySent, Out.ListServicesResponse(ListServiceResponse(list)))
+        }
+        // TODO Validate assumptions here
+        (newAlreadySent, ServerReflectionResponse(req.host, Some(req), 
response))
+      },
+      _ => None)
+  }
+
+  private def toFileDescriptorResponse(
+      fileDescriptor: Option[FileDescriptor],
+      alreadySent: Set[String]): (Set[String], Out.FileDescriptorResponse) = {
+    fileDescriptor match {
+      case None =>
+        (alreadySent, Out.FileDescriptorResponse(FileDescriptorResponse()))
+      case Some(file) =>
+        val (newAlreadySent, files) = withTransitiveDeps(alreadySent, file)
+        (newAlreadySent, 
Out.FileDescriptorResponse(FileDescriptorResponse(files.map(getProtoBytes))))
+    }
+  }
+
+  private def withTransitiveDeps(
+      alreadySent: Set[String],
+      file: FileDescriptor): (Set[String], List[FileDescriptor]) = {
+    @annotation.tailrec
+    def iterate(
+        sent: Set[String],
+        results: List[FileDescriptor],
+        toAdd: List[FileDescriptor]): (Set[String], List[FileDescriptor]) = {
+      toAdd match {
+        case Nil => (sent, results)
+        case _   =>
+          // Need to compute the new set of files sent before working out 
which dependencies to send, to ensure
+          // we don't send any dependencies that are being sent in this 
iteration
+          val nowSent = sent ++ toAdd.map(_.getName)
+          val depsOfToAdd =
+            toAdd.flatMap(_.getDependencies.asScala).distinct.filterNot(dep => 
nowSent.contains(dep.getName))
+          iterate(nowSent, toAdd ::: results, depsOfToAdd)
       }
-      // TODO Validate assumptions here
-      ServerReflectionResponse(req.host, Some(req), response)
-    })
+    }
+
+    iterate(alreadySent, Nil, List(file))
   }
 
   private def getProtoBytes(fileDescriptor: FileDescriptor): ByteString =
diff --git 
a/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_1.proto
 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_1.proto
new file mode 100644
index 00000000..67ebc999
--- /dev/null
+++ 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_1.proto
@@ -0,0 +1,11 @@
+syntax = "proto3";
+
+package org.apache.pekko.grpc.internal;
+
+import "org/apache/pekko/grpc/internal/reflection_test_2.proto";
+import "org/apache/pekko/grpc/internal/reflection_test_3.proto";
+
+message MyMessage1 {
+  MyMessage2 field1 = 1;
+  MyMessage3 field2 = 2;
+}
diff --git 
a/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_2.proto
 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_2.proto
new file mode 100644
index 00000000..9dc8093d
--- /dev/null
+++ 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_2.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+
+package org.apache.pekko.grpc.internal;
+
+import "org/apache/pekko/grpc/internal/reflection_test_3.proto";
+
+message MyMessage2 {
+  MyMessage3 field1 = 2;
+}
diff --git 
a/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_3.proto
 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_3.proto
new file mode 100644
index 00000000..9dad5e7b
--- /dev/null
+++ 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_3.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+
+package org.apache.pekko.grpc.internal;
+
+import "org/apache/pekko/grpc/internal/reflection_test_4.proto";
+
+message MyMessage3 {
+  MyMessage4 field1 = 1;
+}
diff --git 
a/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_4.proto
 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_4.proto
new file mode 100644
index 00000000..bcbde9e9
--- /dev/null
+++ 
b/runtime/src/test/protobuf/org/apache/pekko/grpc/internal/reflection_test_4.proto
@@ -0,0 +1,6 @@
+syntax = "proto3";
+
+package org.apache.pekko.grpc.internal;
+
+message MyMessage4 {
+}
diff --git 
a/runtime/src/test/scala/org/apache/pekko/grpc/internal/ServerReflectionImplSpec.scala
 
b/runtime/src/test/scala/org/apache/pekko/grpc/internal/ServerReflectionImplSpec.scala
index 9cbed69b..5329c099 100644
--- 
a/runtime/src/test/scala/org/apache/pekko/grpc/internal/ServerReflectionImplSpec.scala
+++ 
b/runtime/src/test/scala/org/apache/pekko/grpc/internal/ServerReflectionImplSpec.scala
@@ -15,20 +15,27 @@ package org.apache.pekko.grpc.internal
 
 import org.apache.pekko
 import pekko.actor.ActorSystem
+import pekko.grpc.internal.reflection_test_1.ReflectionTest1Proto
+import pekko.grpc.internal.reflection_test_2.ReflectionTest2Proto
+import pekko.grpc.internal.reflection_test_3.ReflectionTest3Proto
+import pekko.grpc.internal.reflection_test_4.ReflectionTest4Proto
 import pekko.stream.scaladsl.{ Sink, Source }
 import pekko.testkit.TestKit
+import com.google.protobuf.descriptor.FileDescriptorProto
 import io.grpc.reflection.v1.reflection.ServerReflectionRequest.MessageRequest
-import io.grpc.reflection.v1.reflection.{ ServerReflection, 
ServerReflectionRequest }
+import io.grpc.reflection.v1.reflection.{ ServerReflection, 
ServerReflectionRequest, ServerReflectionResponse }
+import org.scalatest.BeforeAndAfterAll
 import org.scalatest.OptionValues
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
 
 class ServerReflectionImplSpec
-    extends TestKit(ActorSystem())
+    extends TestKit(ActorSystem("ServerReflectionImplSpec"))
     with AnyWordSpecLike
     with Matchers
     with ScalaFutures
+    with BeforeAndAfterAll
     with OptionValues {
   import ServerReflectionImpl._
   "The Server Reflection implementation utilities" should {
@@ -44,14 +51,22 @@ class ServerReflectionImplSpec
   }
 
   "The Server Reflection implementation" should {
+    val serverReflection =
+      ServerReflectionImpl(
+        Seq(
+          ServerReflection.descriptor,
+          ReflectionTest1Proto.javaDescriptor,
+          ReflectionTest2Proto.javaDescriptor,
+          ReflectionTest3Proto.javaDescriptor,
+          ReflectionTest4Proto.javaDescriptor),
+        List.empty[String])
+
     "retrieve server reflection info" in {
       val serverReflectionRequest = ServerReflectionRequest(messageRequest =
         MessageRequest.FileByFilename("grpc/reflection/v1/reflection.proto"))
 
-      val serverReflectionResponse = 
ServerReflectionImpl(Seq(ServerReflection.descriptor), List.empty[String])
-        .serverReflectionInfo(Source.single(serverReflectionRequest))
-        .runWith(Sink.head)
-        .futureValue
+      val serverReflectionResponse =
+        
serverReflection.serverReflectionInfo(Source.single(serverReflectionRequest)).runWith(Sink.head).futureValue
 
       serverReflectionResponse.messageResponse.listServicesResponse should 
be(empty)
 
@@ -63,13 +78,69 @@ class ServerReflectionImplSpec
       val serverReflectionRequest =
         ServerReflectionRequest(messageRequest = 
MessageRequest.FileByFilename("grpc/reflection/v1/unknown.proto"))
 
-      val serverReflectionResponse = 
ServerReflectionImpl(Seq(ServerReflection.descriptor), List.empty[String])
-        .serverReflectionInfo(Source.single(serverReflectionRequest))
-        .runWith(Sink.head)
-        .futureValue
+      val serverReflectionResponse =
+        
serverReflection.serverReflectionInfo(Source.single(serverReflectionRequest)).runWith(Sink.head).futureValue
 
       serverReflectionResponse.messageResponse.listServicesResponse should 
be(empty)
       
serverReflectionResponse.messageResponse.fileDescriptorResponse.value.fileDescriptorProto
 should be(empty)
     }
+
+    "return transitive dependencies" in {
+      val serverReflectionRequest = ServerReflectionRequest(messageRequest =
+        
MessageRequest.FileByFilename("org/apache/pekko/grpc/internal/reflection_test_1.proto"))
+
+      val serverReflectionResponse =
+        
serverReflection.serverReflectionInfo(Source.single(serverReflectionRequest)).runWith(Sink.head).futureValue
+
+      val protos = decodeFileResponseToNames(serverReflectionResponse)
+      protos should have size 4
+      (protos should contain).allOf(
+        "org/apache/pekko/grpc/internal/reflection_test_1.proto",
+        "org/apache/pekko/grpc/internal/reflection_test_2.proto",
+        "org/apache/pekko/grpc/internal/reflection_test_3.proto",
+        "org/apache/pekko/grpc/internal/reflection_test_4.proto")
+    }
+
+    "not return transitive dependencies already sent" in {
+      val req1 = ServerReflectionRequest(messageRequest =
+        
MessageRequest.FileByFilename("org/apache/pekko/grpc/internal/reflection_test_4.proto"))
+      val req2 = ServerReflectionRequest(messageRequest =
+        
MessageRequest.FileByFilename("org/apache/pekko/grpc/internal/reflection_test_1.proto"))
+      val req3 = ServerReflectionRequest(messageRequest =
+        
MessageRequest.FileByFilename("org/apache/pekko/grpc/internal/reflection_test_2.proto"))
+
+      val responses =
+        serverReflection.serverReflectionInfo(Source(List(req1, req2, 
req3))).runWith(Sink.seq).futureValue
+
+      (responses should have).length(3)
+
+      val protos1 = decodeFileResponseToNames(responses.head)
+      protos1 should have size 1
+      protos1.head shouldBe 
"org/apache/pekko/grpc/internal/reflection_test_4.proto"
+
+      val protos2 = decodeFileResponseToNames(responses(1))
+      // all except 4, because 4 has already been sent
+      protos2 should have size 3
+      (protos2 should contain).allOf(
+        "org/apache/pekko/grpc/internal/reflection_test_1.proto",
+        "org/apache/pekko/grpc/internal/reflection_test_2.proto",
+        "org/apache/pekko/grpc/internal/reflection_test_3.proto")
+
+      val protos3 = decodeFileResponseToNames(responses(2))
+      // should still include 2, because 2 was explicitly requested, but 
should not include anything else
+      // because everything has already been sent
+      protos3 should have size 1
+      protos3.head shouldBe 
"org/apache/pekko/grpc/internal/reflection_test_2.proto"
+
+    }
+
+  }
+
+  private def decodeFileResponseToNames(response: ServerReflectionResponse): 
Seq[String] =
+    
response.messageResponse.fileDescriptorResponse.value.fileDescriptorProto.map(bs
 =>
+      FileDescriptorProto.parseFrom(bs.newCodedInput()).name.getOrElse(""))
+
+  override protected def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to