This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fac11c494706 [SPARK-55291][CONNECT] Pre-process metadata headers at 
client interceptor construction time
fac11c494706 is described below

commit fac11c494706f351a51143cc9b7edc0ca0f28bcc
Author: Yihong He <[email protected]>
AuthorDate: Thu Feb 5 00:01:31 2026 +0800

    [SPARK-55291][CONNECT] Pre-process metadata headers at client interceptor 
construction time
    
    ### What changes were proposed in this pull request?
    
    Refactored MetadataHeaderClientInterceptor to pre-process metadata headers 
(key creation and Base64 decoding) at construction time instead of on every RPC 
call.
    
    ### Why are the changes needed?
    
    - Avoids redundant processing on every RPC
    - Catches invalid Base64-encoded binary headers earlier at construction time
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite -- -z 
SPARK-55243"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #54074 from heyihong/SPARK-55291.
    
    Authored-by: Yihong He <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../connect/client/SparkConnectClientSuite.scala   |  6 ++---
 .../sql/connect/client/SparkConnectClient.scala    | 29 +++++++++++++++-------
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 1f0e7b89ddc7..1cfc2d3eb09f 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -693,11 +693,9 @@ class SparkConnectClientSuite extends ConnectFunSuite {
       assert(new String(bytes, UTF_8) == binaryData)
     }
 
-    // Non base64-encoded -bin header throws IllegalArgumentException.
-    client = buildClientWithHeader(keyName, binaryData)
-
+    // Non base64-encoded -bin header throws IllegalArgumentException at 
construction time.
     assertThrows[IllegalArgumentException] {
-      client.execute(plan)
+      buildClientWithHeader(keyName, binaryData)
     }
 
     // Non -bin headers keep using the ASCII marshaller.
diff --git 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index cac43c2cb67c..0fa7d9ada48b 100644
--- 
a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -1094,7 +1094,23 @@ object SparkConnectClient {
    */
   private[client] class MetadataHeaderClientInterceptor(metadata: Map[String, 
String])
       extends ClientInterceptor {
-    metadata.foreach { case (key, value) => assert(key != null && value != 
null) }
+
+    // Sealed trait for pre-processed metadata entries
+    private sealed trait MetadataEntry
+    private case class AsciiEntry(key: Metadata.Key[String], value: String) 
extends MetadataEntry
+    private case class BinaryEntry(key: Metadata.Key[Array[Byte]], value: 
Array[Byte])
+        extends MetadataEntry
+
+    // Pre-process metadata at construction time
+    private val entries: Seq[MetadataEntry] = metadata.map { case (key, value) 
=>
+      assert(key != null && value != null)
+      if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+        val valueByteArray = Base64.getDecoder.decode(value.getBytes(UTF_8))
+        BinaryEntry(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER), 
valueByteArray)
+      } else {
+        AsciiEntry(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), 
value)
+      }
+    }.toSeq
 
     override def interceptCall[ReqT, RespT](
         method: MethodDescriptor[ReqT, RespT],
@@ -1105,14 +1121,9 @@ object SparkConnectClient {
         override def start(
             responseListener: ClientCall.Listener[RespT],
             headers: Metadata): Unit = {
-          metadata.foreach { case (key, value) =>
-            if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
-              // Expects a base64-encoded value string.
-              val valueByteArray = 
Base64.getDecoder.decode(value.getBytes(UTF_8))
-              headers.put(Metadata.Key.of(key, 
Metadata.BINARY_BYTE_MARSHALLER), valueByteArray)
-            } else {
-              headers.put(Metadata.Key.of(key, 
Metadata.ASCII_STRING_MARSHALLER), value)
-            }
+          entries.foreach {
+            case AsciiEntry(key, value) => headers.put(key, value)
+            case BinaryEntry(key, value) => headers.put(key, value)
           }
           super.start(responseListener, headers)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to