This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 64642b48351 [SPARK-43530][PROTOBUF] Read descriptor file only once
64642b48351 is described below

commit 64642b48351c0c4ef8f40ce7902b85b6f953bd8f
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Sat May 27 23:18:54 2023 +0800

    [SPARK-43530][PROTOBUF] Read descriptor file only once
    
    ### What changes were proposed in this pull request?
    
    Protobuf functions (`from_protobuf()` & `to_protobuf()`) take file path of 
a descriptor file and use that for constructing Protobuf descriptors.
    Main problem with how this is that the file is read many times (e.g. at 
each executor). This is unnecessary and error prone. E.g. file contents may be 
updated couple of days after a streaming query starts. That could lead to 
various errors.
    
    **The fix**: Use the byte content (which is serialized `FileDescritptorSet` 
proto). We read the content from the file once and carry the byte buffer.
    
    This also adds new API where we can pass the byte buffer directly. This is 
useful when the users fetch the content themselves and passes it to Protobuf 
functions. E.g. they could fetch it from S3, or extract it Python Protobuf 
classes.
    
    **Note to reviewers**: This includes a lot of updates to test files, mainly 
because the interface change to pass the buffer. I have left a few PR comments 
to help with the review.
    
    ### Why are the changes needed?
    Described above.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, this adds two new versions for `from_protobuf()` and `to_protobuf()` 
API that take Protobuf bytes rather than file path.
    
    ### How was this patch tested?
     - Unit tests
    
    Closes #41192 from rangadi/proto-file-buffer.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../org/apache/spark/sql/protobuf/functions.scala  | 135 +++++++++++++++++++--
 .../org/apache/spark/sql/FunctionTestSuite.scala   |  12 +-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   7 +-
 ..._protobuf_messageClassName_descFilePath.explain |   2 +-
 ...f_messageClassName_descFilePath_options.explain |   2 +-
 ..._protobuf_messageClassName_descFilePath.explain |   2 +-
 ...f_messageClassName_descFilePath_options.explain |   2 +-
 ...rom_protobuf_messageClassName_descFilePath.json |   2 +-
 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 156 -> 361 bytes
 ...obuf_messageClassName_descFilePath_options.json |   2 +-
 ...messageClassName_descFilePath_options.proto.bin | Bin 206 -> 409 bytes
 .../to_protobuf_messageClassName_descFilePath.json |   2 +-
 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 154 -> 359 bytes
 ...obuf_messageClassName_descFilePath_options.json |   2 +-
 ...messageClassName_descFilePath_options.proto.bin | Bin 204 -> 407 bytes
 .../sql/connect/planner/SparkConnectPlanner.scala  |  33 ++---
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |   4 +-
 .../sql/protobuf/CatalystDataToProtobuf.scala      |   7 +-
 .../sql/protobuf/ProtobufDataToCatalyst.scala      |  14 +--
 .../org/apache/spark/sql/protobuf/functions.scala  | 114 +++++++++++++++--
 .../spark/sql/protobuf/utils/ProtobufUtils.scala   |  67 +++++-----
 .../ProtobufCatalystDataConversionSuite.scala      |  29 ++---
 .../sql/protobuf/ProtobufFunctionsSuite.scala      |  65 +++++-----
 .../spark/sql/protobuf/ProtobufSerdeSuite.scala    |  30 +++--
 core/src/main/resources/error/error-classes.json   |   7 +-
 docs/sql-error-conditions.md                       |   6 -
 .../spark/sql/errors/QueryCompilationErrors.scala  |  11 +-
 27 files changed, 393 insertions(+), 164 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
index c42f8417155..57ce013065e 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
@@ -16,10 +16,19 @@
  */
 package org.apache.spark.sql.protobuf
 
+import java.io.File
+import java.io.FileNotFoundException
+import java.nio.file.NoSuchFileException
+import java.util.Collections
+
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.Column
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.functions.{fnWithOptions, lit}
 
 // scalastyle:off: object.name
@@ -35,7 +44,8 @@ object functions {
    * @param messageName
    *   the protobuf message name to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @param options
    * @since 3.5.0
    */
@@ -45,12 +55,36 @@ object functions {
       messageName: String,
       descFilePath: String,
       options: java.util.Map[String, String]): Column = {
+    val binaryFileDescSet = readDescriptorFileContent(descFilePath)
+    from_protobuf(data, messageName, binaryFileDescSet, options)
+  }
+
+  /**
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.The
+   * Protobuf definition is provided through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   * @param options
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_protobuf(
+      data: Column,
+      messageName: String,
+      binaryFileDescriptorSet: Array[Byte],
+      options: java.util.Map[String, String]): Column = {
     fnWithOptions(
       "from_protobuf",
       options.asScala.iterator,
       data,
       lit(messageName),
-      lit(descFilePath))
+      lit(binaryFileDescriptorSet))
   }
 
   /**
@@ -62,12 +96,13 @@ object functions {
    * @param messageName
    *   the protobuf MessageName to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @since 3.5.0
    */
   @Experimental
   def from_protobuf(data: Column, messageName: String, descFilePath: String): 
Column = {
-    Column.fn("from_protobuf", data, lit(messageName), lit(descFilePath))
+    from_protobuf(data, messageName, descFilePath, emptyOptions)
   }
 
   /**
@@ -90,6 +125,27 @@ object functions {
     Column.fn("from_protobuf", data, lit(messageClassName))
   }
 
+  /**
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.The
+   * Protobuf definition is provided through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_protobuf(
+      data: Column,
+      messageName: String,
+      binaryFileDescriptorSet: Array[Byte]): Column = {
+    from_protobuf(data, messageName, binaryFileDescriptorSet, emptyOptions)
+  }
+
   /**
    * Converts a binary column of Protobuf format into its corresponding 
catalyst value.
    * `messageClassName` points to Protobuf Java class. The jar containing Java 
class should be
@@ -123,12 +179,35 @@ object functions {
    * @param messageName
    *   the protobuf MessageName to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @since 3.5.0
    */
   @Experimental
   def to_protobuf(data: Column, messageName: String, descFilePath: String): 
Column = {
-    Column.fn("to_protobuf", data, lit(messageName), lit(descFilePath))
+    to_protobuf(data, messageName, descFilePath, emptyOptions)
+  }
+
+  /**
+   * Converts a column into binary of protobuf format.The Protobuf definition 
is provided through
+   * Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_protobuf(
+      data: Column,
+      messageName: String,
+      binaryFileDescriptorSet: Array[Byte]): Column = {
+    to_protobuf(data, messageName, binaryFileDescriptorSet, emptyOptions)
   }
 
   /**
@@ -140,7 +219,8 @@ object functions {
    * @param messageName
    *   the protobuf MessageName to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @param options
    * @since 3.5.0
    */
@@ -150,12 +230,36 @@ object functions {
       messageName: String,
       descFilePath: String,
       options: java.util.Map[String, String]): Column = {
+    val binaryFileDescriptorSet = readDescriptorFileContent(descFilePath)
+    to_protobuf(data, messageName, binaryFileDescriptorSet, options)
+  }
+
+  /**
+   * Converts a column into binary of protobuf format.The Protobuf definition 
is provided through
+   * Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   * @param options
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_protobuf(
+      data: Column,
+      messageName: String,
+      binaryFileDescriptorSet: Array[Byte],
+      options: java.util.Map[String, String]): Column = {
     fnWithOptions(
       "to_protobuf",
       options.asScala.iterator,
       data,
       lit(messageName),
-      lit(descFilePath))
+      lit(binaryFileDescriptorSet))
   }
 
   /**
@@ -199,4 +303,19 @@ object functions {
       options: java.util.Map[String, String]): Column = {
     fnWithOptions("to_protobuf", options.asScala.iterator, data, 
lit(messageClassName))
   }
+
+  private def emptyOptions: java.util.Map[String, String] = 
Collections.emptyMap[String, String]()
+
+  // This method is copied from 
org.apache.spark.sql.protobuf.util.ProtobufUtils
+  private def readDescriptorFileContent(filePath: String): Array[Byte] = {
+    try {
+      FileUtils.readFileToByteArray(new File(filePath))
+    } catch {
+      case ex: FileNotFoundException =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)
+      case ex: NoSuchFileException =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)
+      case NonFatal(ex) => throw 
QueryCompilationErrors.descriptorParseError(ex)
+    }
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index c92f77e8144..32004b6bcc1 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -238,12 +238,16 @@ class FunctionTestSuite extends ConnectFunSuite {
       Collections.emptyMap[String, String]))
   testEquals(
     "from_protobuf",
-    pbFn.from_protobuf(a, "FakeMessage", "fakePath", Map.empty[String, 
String].asJava),
-    pbFn.from_protobuf(a, "FakeMessage", "fakePath"))
+    pbFn.from_protobuf(
+      a,
+      "FakeMessage",
+      "fakeBytes".getBytes(),
+      Map.empty[String, String].asJava),
+    pbFn.from_protobuf(a, "FakeMessage", "fakeBytes".getBytes()))
   testEquals(
     "to_protobuf",
-    pbFn.to_protobuf(a, "FakeMessage", "fakePath", Map.empty[String, 
String].asJava),
-    pbFn.to_protobuf(a, "FakeMessage", "fakePath"))
+    pbFn.to_protobuf(a, "FakeMessage", "fakeBytes".getBytes(), 
Map.empty[String, String].asJava),
+    pbFn.to_protobuf(a, "FakeMessage", "fakeBytes".getBytes()))
 
   test("assert_true no message") {
     val e = assert_true(a).expr
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 94b9adda655..607db2ee086 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.connect.client.SparkConnectClient
 import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+import org.apache.spark.sql.connect.client.util.IntegrationTestUtils
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.protobuf.{functions => pbFn}
@@ -2255,10 +2256,8 @@ class PlanGenerationTestSuite
   //  1. cd connector/connect/common/src/main/protobuf/spark/connect
   //  2. protoc --include_imports 
--descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc 
common.proto
   // scalastyle:on line.size.limit
-  private val testDescFilePath: String = java.nio.file.Paths
-    .get("../", "common", "src", "test", "resources", "protobuf-tests")
-    .resolve("common.desc")
-    .toString
+  private val testDescFilePath: String = 
s"${IntegrationTestUtils.sparkHome}/connector/" +
+    "connect/common/src/test/resources/protobuf-tests/common.desc"
 
   test("from_protobuf messageClassName") {
     binary.select(pbFn.from_protobuf(fn.col("bytes"), 
classOf[StorageLevel].getName))
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain
index b3eaa5f44d6..6eb4805b4fc 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain
@@ -1,2 +1,2 @@
-Project [from_protobuf(bytes#0, StorageLevel, 
Some(../common/src/test/resources/protobuf-tests/common.desc)) AS 
from_protobuf(bytes)#0]
+Project [from_protobuf(bytes#0, StorageLevel, Some([B)) AS 
from_protobuf(bytes)#0]
 +- LocalRelation <empty>, [id#0L, bytes#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain
index 85851e14da3..c4a47b1aef0 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain
@@ -1,2 +1,2 @@
-Project [from_protobuf(bytes#0, StorageLevel, 
Some(../common/src/test/resources/protobuf-tests/common.desc), 
(recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0]
+Project [from_protobuf(bytes#0, StorageLevel, Some([B), 
(recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0]
 +- LocalRelation <empty>, [id#0L, bytes#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain
index 55b53cfcdb1..7c688cc4469 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain
@@ -1,2 +1,2 @@
-Project [to_protobuf(bytes#0, StorageLevel, 
Some(../common/src/test/resources/protobuf-tests/common.desc)) AS 
to_protobuf(bytes)#0]
+Project [to_protobuf(bytes#0, StorageLevel, Some([B)) AS to_protobuf(bytes)#0]
 +- LocalRelation <empty>, [id#0L, bytes#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain
index eafd4124d8f..9f05bb03c9c 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain
@@ -1,2 +1,2 @@
-Project [to_protobuf(bytes#0, StorageLevel, 
Some(../common/src/test/resources/protobuf-tests/common.desc), 
(recursive.fields.max.depth,2)) AS to_protobuf(bytes)#0]
+Project [to_protobuf(bytes#0, StorageLevel, Some([B), 
(recursive.fields.max.depth,2)) AS to_protobuf(bytes)#0]
 +- LocalRelation <empty>, [id#0L, bytes#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json
index 05e210434cd..375c0f9324c 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json
@@ -24,7 +24,7 @@
           }
         }, {
           "literal": {
-            "string": "../common/src/test/resources/protobuf-tests/common.desc"
+            "binary": 
"CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z"
           }
         }]
       }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin
index c4c2b2126b8..07d4c6c5b28 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json
index ff747ede6f1..db9371b64ef 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json
@@ -24,7 +24,7 @@
           }
         }, {
           "literal": {
-            "string": "../common/src/test/resources/protobuf-tests/common.desc"
+            "binary": 
"CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z"
           }
         }, {
           "unresolvedFunction": {
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin
index 640df1c8d1b..00fd58da6be 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json
index e5bbb2ba57d..0843b469384 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json
@@ -24,7 +24,7 @@
           }
         }, {
           "literal": {
-            "string": "../common/src/test/resources/protobuf-tests/common.desc"
+            "binary": 
"CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z"
           }
         }]
       }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin
index 346fc3d90f3..c3fe14aef47 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json
index 183c8a25ce1..76307b3141f 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json
@@ -24,7 +24,7 @@
           }
         }, {
           "literal": {
-            "string": "../common/src/test/resources/protobuf-tests/common.desc"
+            "binary": 
"CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z"
           }
         }, {
           "unresolvedFunction": {
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin
index cc4d08ee581..a387611c1ad 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin
 differ
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index be13578d19c..10528843e40 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1451,32 +1451,33 @@ class SparkConnectPlanner(val session: SparkSession) {
     def extractArgsOfProtobufFunction(
         functionName: String,
         argumentsCount: Int,
-        children: Seq[Expression]): (String, Option[String], Map[String, 
String]) = {
+        children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, 
String]) = {
       val messageClassName = children(1) match {
         case Literal(s, StringType) if s != null => s.toString
         case other =>
           throw InvalidPlanInput(
             s"MessageClassName in $functionName should be a literal string, 
but got $other")
       }
-      val (descFilePathOpt, options) = if (argumentsCount == 2) {
+      val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) {
         (None, Map.empty[String, String])
       } else if (argumentsCount == 3) {
         children(2) match {
-          case Literal(s, StringType) if s != null =>
-            (Some(s.toString), Map.empty[String, String])
+          case Literal(b, BinaryType) if b != null =>
+            (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String])
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
             (None, ExprUtils.convertToMapData(CreateMap(arguments)))
           case other =>
             throw InvalidPlanInput(
-              s"The valid type for the 3rd arg in $functionName is string or 
map, but got $other")
+              s"The valid type for the 3rd arg in $functionName " +
+                s"is binary or map, but got $other")
         }
       } else if (argumentsCount == 4) {
-        val filePathOpt = children(2) match {
-          case Literal(s, StringType) if s != null =>
-            Some(s.toString)
+        val fileDescSetOpt = children(2) match {
+          case Literal(b, BinaryType) if b != null =>
+            Some(b.asInstanceOf[Array[Byte]])
           case other =>
             throw InvalidPlanInput(
-              s"DescFilePath in $functionName should be a literal string, but 
got $other")
+              s"DescFilePath in $functionName should be a literal binary, but 
got $other")
         }
         val map = children(3) match {
           case UnresolvedFunction(Seq("map"), arguments, _, _, _) =>
@@ -1485,12 +1486,12 @@ class SparkConnectPlanner(val session: SparkSession) {
             throw InvalidPlanInput(
               s"Options in $functionName should be created by map, but got 
$other")
         }
-        (filePathOpt, map)
+        (fileDescSetOpt, map)
       } else {
         throw InvalidPlanInput(
           s"$functionName requires 2 ~ 4 arguments, but got $argumentsCount 
ones!")
       }
-      (messageClassName, descFilePathOpt, options)
+      (messageClassName, binaryFileDescSetOpt, options)
     }
 
     fun.getFunctionName match {
@@ -1648,15 +1649,17 @@ class SparkConnectPlanner(val session: SparkSession) {
       // Protobuf-specific functions
       case "from_protobuf" if Seq(2, 3, 4).contains(fun.getArgumentsCount) =>
         val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
-        val (messageClassName, descFilePathOpt, options) =
+        val (messageClassName, binaryFileDescSetOpt, options) =
           extractArgsOfProtobufFunction("from_protobuf", 
fun.getArgumentsCount, children)
-        Some(ProtobufDataToCatalyst(children.head, messageClassName, 
descFilePathOpt, options))
+        Some(
+          ProtobufDataToCatalyst(children.head, messageClassName, 
binaryFileDescSetOpt, options))
 
       case "to_protobuf" if Seq(2, 3, 4).contains(fun.getArgumentsCount) =>
         val children = 
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
-        val (messageClassName, descFilePathOpt, options) =
+        val (messageClassName, binaryFileDescSetOpt, options) =
           extractArgsOfProtobufFunction("to_protobuf", fun.getArgumentsCount, 
children)
-        Some(CatalystDataToProtobuf(children.head, messageClassName, 
descFilePathOpt, options))
+        Some(
+          CatalystDataToProtobuf(children.head, messageClassName, 
binaryFileDescSetOpt, options))
 
       case _ => None
     }
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 64cbfbb6952..1d8e8446c45 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -195,7 +195,9 @@ class ProtoToParsedPlanTestSuite
   }
 
   private def removeMemoryAddress(expr: String): String = {
-    expr.replaceAll("@[0-9a-f]+,", ",")
+    expr
+      .replaceAll("@[0-9a-f]+,", ",")
+      .replaceAll("@[0-9a-f]+\\)", ")")
   }
 
   private def readRelation(path: Path): proto.Relation = {
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
index 62d251fb869..8805d935093 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala
@@ -26,14 +26,17 @@ import org.apache.spark.sql.types.{BinaryType, DataType}
 private[sql] case class CatalystDataToProtobuf(
     child: Expression,
     messageName: String,
-    descFilePath: Option[String] = None,
+    binaryFileDescriptorSet: Option[Array[Byte]] = None,
     options: Map[String, String] = Map.empty)
     extends UnaryExpression {
 
+  // TODO(SPARK-43578): binaryFileDescriptorSet could be very large in some 
cases. It is better
+  //                    to broadcast it so that it is not transferred with 
each task.
+
   override def dataType: DataType = BinaryType
 
   @transient private lazy val protoDescriptor =
-    ProtobufUtils.buildDescriptor(messageName, descFilePathOpt = descFilePath)
+    ProtobufUtils.buildDescriptor(messageName, binaryFileDescriptorSet)
 
   @transient private lazy val serializer =
     new ProtobufSerializer(child.dataType, protoDescriptor, child.nullable)
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index 128169f46a2..5c4a5ff0689 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{AbstractDataType, 
BinaryType, DataType, Struc
 private[sql] case class ProtobufDataToCatalyst(
     child: Expression,
     messageName: String,
-    descFilePath: Option[String] = None,
+    binaryFileDescriptorSet: Option[Array[Byte]] = None,
     options: Map[String, String] = Map.empty)
     extends UnaryExpression
     with ExpectsInputTypes {
@@ -55,19 +55,15 @@ private[sql] case class ProtobufDataToCatalyst(
   private lazy val protobufOptions = ProtobufOptions(options)
 
   @transient private lazy val messageDescriptor =
-    ProtobufUtils.buildDescriptor(messageName, descFilePath)
-    // TODO: Avoid carrying the file name. Read the contents of descriptor 
file only once
-    //       at the start. Rest of the runs should reuse the buffer. 
Otherwise, it could
-    //       cause inconsistencies if the file contents are changed the user 
after a few days.
-    //       Same for the write side in [[CatalystDataToProtobuf]].
+    ProtobufUtils.buildDescriptor(messageName, binaryFileDescriptorSet)
 
   @transient private lazy val fieldsNumbers =
     messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet
 
   @transient private lazy val deserializer = {
-    val typeRegistry = descFilePath match {
-      case Some(path) if protobufOptions.convertAnyFieldsToJson =>
-        ProtobufUtils.buildTypeRegistry(path) // This loads all the messages 
in the file.
+    val typeRegistry = binaryFileDescriptorSet match {
+      case Some(descBytes) if protobufOptions.convertAnyFieldsToJson =>
+        ProtobufUtils.buildTypeRegistry(descBytes) // This loads all the 
messages in the desc set.
       case None if protobufOptions.convertAnyFieldsToJson =>
         ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only 
connected messages.
       case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion 
is not enabled.
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
index 8056082c66f..6a33dfa1da1 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.Column
+import org.apache.spark.sql.protobuf.utils.ProtobufUtils
 
 // scalastyle:off: object.name
 object functions {
@@ -34,7 +35,8 @@ object functions {
    * @param messageName
    *   the protobuf message name to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @param options
    * @since 3.4.0
    */
@@ -44,8 +46,34 @@ object functions {
       messageName: String,
       descFilePath: String,
       options: java.util.Map[String, String]): Column = {
+    val descriptorFileContent = 
ProtobufUtils.readDescriptorFileContent(descFilePath)
+    from_protobuf(data, messageName, descriptorFileContent, options)
+  }
+
+  /**
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.The
+   * Protobuf definition is provided through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   *   @param options
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_protobuf(
+    data: Column,
+    messageName: String,
+    binaryFileDescriptorSet: Array[Byte],
+    options: java.util.Map[String, String]): Column = {
     new Column(
-      ProtobufDataToCatalyst(data.expr, messageName, Some(descFilePath), 
options.asScala.toMap)
+      ProtobufDataToCatalyst(
+        data.expr, messageName, Some(binaryFileDescriptorSet), 
options.asScala.toMap
+      )
     )
   }
 
@@ -58,14 +86,33 @@ object functions {
    * @param messageName
    *   the protobuf MessageName to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @since 3.4.0
    */
   @Experimental
   def from_protobuf(data: Column, messageName: String, descFilePath: String): 
Column = {
-    new Column(ProtobufDataToCatalyst(data.expr, messageName, descFilePath = 
Some(descFilePath)))
-    // TODO: Add an option for user to provide descriptor file content as a 
buffer. This
-    //       gives flexibility in how the content is fetched.
+    val fileContent = ProtobufUtils.readDescriptorFileContent(descFilePath)
+    new Column(ProtobufDataToCatalyst(data.expr, messageName, 
Some(fileContent)))
+  }
+
+  /**
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value.The
+   * Protobuf definition is provided through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   * @since 3.5.0
+   */
+  @Experimental
+  def from_protobuf(data: Column, messageName: String, 
binaryFileDescriptorSet: Array[Byte])
+  : Column = {
+    new Column(ProtobufDataToCatalyst(data.expr, messageName, 
Some(binaryFileDescriptorSet)))
   }
 
   /**
@@ -121,14 +168,34 @@ object functions {
    * @param messageName
    *   the protobuf MessageName to look for in descriptor file.
    * @param descFilePath
-   *   the protobuf descriptor file.
+   *   The Protobuf descriptor file. This file is usually created using 
`protoc` with
+   *   `--descriptor_set_out` and `--include_imports` options.
    * @since 3.4.0
    */
   @Experimental
   def to_protobuf(data: Column, messageName: String, descFilePath: String): 
Column = {
-    new Column(CatalystDataToProtobuf(data.expr, messageName, 
Some(descFilePath)))
+    to_protobuf(data, messageName, descFilePath, Map.empty[String, 
String].asJava)
   }
 
+  /**
+   * Converts a column into binary of protobuf format.The Protobuf definition 
is provided
+   * through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   *
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_protobuf(data: Column, messageName: String, binaryFileDescriptorSet: 
Array[Byte])
+  : Column = {
+    new Column(CatalystDataToProtobuf(data.expr, messageName, 
Some(binaryFileDescriptorSet)))
+  }
   /**
    * Converts a column into binary of protobuf format. The Protobuf definition 
is provided
    * through Protobuf <i>descriptor file</i>.
@@ -148,8 +215,37 @@ object functions {
     messageName: String,
     descFilePath: String,
     options: java.util.Map[String, String]): Column = {
+    val fileContent = ProtobufUtils.readDescriptorFileContent(descFilePath)
+    new Column(
+      CatalystDataToProtobuf(data.expr, messageName, Some(fileContent), 
options.asScala.toMap)
+    )
+  }
+
+  /**
+   * Converts a column into binary of protobuf format.The Protobuf definition 
is provided
+   * through Protobuf `FileDescriptorSet`.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the protobuf MessageName to look for in the descriptor set.
+   * @param binaryFileDescriptorSet
+   *   Serialized Protobuf descriptor (`FileDescriptorSet`). Typically 
contents of file created
+   *   using `protoc` with `--descriptor_set_out` and `--include_imports` 
options.
+   * @param options
+   * @since 3.5.0
+   */
+  @Experimental
+  def to_protobuf(
+    data: Column,
+    messageName: String,
+    binaryFileDescriptorSet: Array[Byte],
+    options: java.util.Map[String, String]
+  ): Column = {
     new Column(
-      CatalystDataToProtobuf(data.expr, messageName, Some(descFilePath), 
options.asScala.toMap)
+      CatalystDataToProtobuf(
+        data.expr, messageName, Some(binaryFileDescriptorSet), 
options.asScala.toMap
+      )
     )
   }
 
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index 5688a483da0..6f49abfd208 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -17,15 +17,19 @@
 
 package org.apache.spark.sql.protobuf.utils
 
-import java.io.{BufferedInputStream, FileInputStream, IOException}
+import java.io.File
+import java.io.FileNotFoundException
+import java.nio.file.NoSuchFileException
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import com.google.protobuf.{DescriptorProtos, Descriptors, 
InvalidProtocolBufferException, Message}
 import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, 
FileDescriptorSet}
 import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
 import com.google.protobuf.TypeRegistry
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -139,15 +143,16 @@ private[sql] object ProtobufUtils extends Logging {
    * Builds Protobuf message descriptor either from the Java class or from 
serialized descriptor
    * read from the file.
    * @param messageName
-   *  Protobuf message name or Java class name.
-   * @param descFilePathOpt
-   *  When the file name set, the descriptor and it's dependencies are read 
from the file. Other
-   *  the `messageName` is treated as Java class name.
+   *  Protobuf message name or Java class name (when binaryFileDescriptorSet 
is None)..
+   * @param binaryFileDescriptorSet
+   *  When the binary `FileDescriptorSet` is provided, the descriptor and its 
dependencies are
+   *  read from it.
    * @return
    */
-  def buildDescriptor(messageName: String, descFilePathOpt: Option[String]): 
Descriptor = {
-    descFilePathOpt match {
-      case Some(filePath) => buildDescriptor(descFilePath = filePath, 
messageName)
+  def buildDescriptor(messageName: String, binaryFileDescriptorSet: 
Option[Array[Byte]])
+  : Descriptor = {
+    binaryFileDescriptorSet match {
+      case Some(bytes) => buildDescriptor(bytes, messageName)
       case None => buildDescriptorFromJavaClass(messageName)
     }
   }
@@ -208,9 +213,9 @@ private[sql] object ProtobufUtils extends Logging {
       .asInstanceOf[Descriptor]
   }
 
-  def buildDescriptor(descFilePath: String, messageName: String): Descriptor = 
{
+  def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: 
String): Descriptor = {
     // Find the first message descriptor that matches the name.
-    val descriptorOpt = parseFileDescriptorSet(descFilePath)
+    val descriptorOpt = parseFileDescriptorSet(binaryFileDescriptorSet)
       .flatMap { fileDesc =>
         fileDesc.getMessageTypes.asScala.find { desc =>
           desc.getName == messageName || desc.getFullName == messageName
@@ -223,28 +228,32 @@ private[sql] object ProtobufUtils extends Logging {
     }
   }
 
-  private def parseFileDescriptorSet(descFilePath: String): 
List[Descriptors.FileDescriptor] = {
-    var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
+  def readDescriptorFileContent(filePath: String): Array[Byte] = {
     try {
-      val dscFile = new BufferedInputStream(new FileInputStream(descFilePath))
-      fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile)
+      FileUtils.readFileToByteArray(new File(filePath))
     } catch {
-      case ex: InvalidProtocolBufferException =>
-        throw QueryCompilationErrors.descriptorParseError(descFilePath, ex)
-      case ex: IOException =>
-        throw 
QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex)
+      case ex: FileNotFoundException =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)
+      case ex: NoSuchFileException =>
+        throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, 
ex)
+      case NonFatal(ex) => throw 
QueryCompilationErrors.descriptorParseError(ex)
     }
+  }
+
+  private def parseFileDescriptorSet(bytes: Array[Byte]): 
List[Descriptors.FileDescriptor] = {
+    var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null
     try {
-      val fileDescriptorProtoIndex = 
createDescriptorProtoMap(fileDescriptorSet)
-      val fileDescriptorList: List[Descriptors.FileDescriptor] =
-        fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
-          buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
-        ).toList
-      fileDescriptorList
+      fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(bytes)
     } catch {
-      case e: Exception =>
-        throw 
QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e)
+      case ex: InvalidProtocolBufferException =>
+        throw QueryCompilationErrors.descriptorParseError(ex)
     }
+    val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet)
+    val fileDescriptorList: List[Descriptors.FileDescriptor] =
+      fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto =>
+        buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex)
+      ).toList
+    fileDescriptorList
   }
 
   /**
@@ -285,10 +294,10 @@ private[sql] object ProtobufUtils extends Logging {
     case n => s"field '${n.mkString(".")}'"
   }
 
-  /** Builds [[TypeRegistry]] with all the messages found in the descriptor 
file. */
-  private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry 
= {
+  /** Builds [[TypeRegistry]] with all the messages found in the descriptor 
set. */
+  private[protobuf] def buildTypeRegistry(descriptorBytes: Array[Byte]): 
TypeRegistry = {
     val registryBuilder = TypeRegistry.newBuilder()
-    for (fileDesc <- parseFileDescriptorSet(descFilePath)) {
+    for (fileDesc <- parseFileDescriptorSet(descriptorBytes)) {
       registryBuilder.add(fileDesc.getMessageTypes)
     }
     registryBuilder.build()
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 3e9273835e3..fedb06103dc 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -37,7 +37,8 @@ class ProtobufCatalystDataConversionSuite
     with ExpressionEvalHelper
     with ProtobufTestBase {
 
-  private val testFileDesc = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
+  private val testFileDescFile = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
+  private val testFileDesc = 
ProtobufUtils.readDescriptorFileContent(testFileDescFile)
   private val javaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.CatalystTypes$"
 
   private def checkResultWithEval(
@@ -45,22 +46,21 @@ class ProtobufCatalystDataConversionSuite
       descFilePath: String,
       messageName: String,
       expected: Any): Unit = {
-
+    val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath)
     withClue("(Eval check with Java class name)") {
       val className = s"$javaClassNamePrefix$messageName"
       checkEvaluation(
         ProtobufDataToCatalyst(
           CatalystDataToProtobuf(data, className),
-          className,
-          descFilePath = None),
+          className),
         prepareExpectedResult(expected))
     }
     withClue("(Eval check with descriptor file)") {
       checkEvaluation(
         ProtobufDataToCatalyst(
-          CatalystDataToProtobuf(data, messageName, Some(descFilePath)),
+          CatalystDataToProtobuf(data, messageName, Some(descBytes)),
           messageName,
-          descFilePath = Some(descFilePath)),
+          Some(descBytes)),
         prepareExpectedResult(expected))
     }
   }
@@ -71,15 +71,16 @@ class ProtobufCatalystDataConversionSuite
       actualSchema: String,
       badSchema: String): Unit = {
 
-    val binary = CatalystDataToProtobuf(data, actualSchema, Some(descFilePath))
+    val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath)
+    val binary = CatalystDataToProtobuf(data, actualSchema, Some(descBytes))
 
     intercept[Exception] {
-      ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" 
-> "FAILFAST"))
+      ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> 
"FAILFAST"))
         .eval()
     }
 
     val expected = {
-      val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, 
badSchema)
+      val expectedSchema = ProtobufUtils.buildDescriptor(descBytes, badSchema)
       SchemaConverters.toSqlType(expectedSchema).dataType match {
         case st: StructType =>
           Row.fromSeq((0 until st.length).map { _ =>
@@ -90,7 +91,7 @@ class ProtobufCatalystDataConversionSuite
     }
 
     checkEvaluation(
-      ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" 
-> "PERMISSIVE")),
+      ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> 
"PERMISSIVE")),
       expected)
   }
 
@@ -145,20 +146,20 @@ class ProtobufCatalystDataConversionSuite
 
       checkResultWithEval(
         input,
-        testFileDesc,
+        testFileDescFile,
         messageName,
         input.eval())
     }
   }
 
   private def checkDeserialization(
-      descFilePath: String,
+      descFileBytes: Array[Byte],
       messageName: String,
       data: Message,
       expected: Option[Any],
       filters: StructFilters = new NoopFilters): Unit = {
 
-    val descriptor = ProtobufUtils.buildDescriptor(descFilePath, messageName)
+    val descriptor = ProtobufUtils.buildDescriptor(descFileBytes, messageName)
     val dataType = SchemaConverters.toSqlType(descriptor).dataType
 
     val deserializer = new ProtobufDeserializer(descriptor, dataType, filters)
@@ -195,7 +196,7 @@ class ProtobufCatalystDataConversionSuite
       val data = RandomDataGenerator.randomRow(new scala.util.Random(seed), 
actualSchema)
       val converter = 
CatalystTypeConverters.createToCatalystConverter(actualSchema)
       val input = Literal.create(converter(data), actualSchema)
-      checkUnsupportedRead(input, testFileDesc, "Actual", "Bad")
+      checkUnsupportedRead(input, testFileDescFile, "Actual", "Bad")
     }
   }
 
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 7e6cf0a3c96..f0979209f86 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -40,10 +40,12 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
 
   import testImplicits._
 
-  val testFileDesc = testFile("functions_suite.desc", 
"protobuf/functions_suite.desc")
+  val testFileDescFile = testFile("functions_suite.desc", 
"protobuf/functions_suite.desc")
+  private val testFileDesc = 
ProtobufUtils.readDescriptorFileContent(testFileDescFile)
   private val javaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$"
 
-  val proto2FileDesc = testFile("proto2_messages.desc", 
"protobuf/proto2_messages.desc")
+  val proto2FileDescFile = testFile("proto2_messages.desc", 
"protobuf/proto2_messages.desc")
+  val proto2FileDesc = 
ProtobufUtils.readDescriptorFileContent(proto2FileDescFile)
   private val proto2JavaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.Proto2Messages$"
 
   private def emptyBinaryDF = Seq(Array[Byte]()).toDF("binary")
@@ -52,7 +54,7 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
    * Runs the given closure twice. Once with descriptor file and second time 
with Java class name.
    */
   private def checkWithFileAndClassName(messageName: String)(
-    fn: (String, Option[String]) => Unit): Unit = {
+    fn: (String, Option[Array[Byte]]) => Unit): Unit = {
       withClue("(With descriptor file)") {
         fn(messageName, Some(testFileDesc))
       }
@@ -62,7 +64,7 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
   }
 
   private def checkWithProto2FileAndClassName(messageName: String)(
-    fn: (String, Option[String]) => Unit): Unit = {
+    fn: (String, Option[Array[Byte]]) => Unit): Unit = {
     withClue("(With descriptor file)") {
       fn(messageName, Some(proto2FileDesc))
     }
@@ -75,11 +77,11 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
   private def from_protobuf_wrapper(
     col: Column,
     messageName: String,
-    descFilePathOpt: Option[String],
+    descBytesOpt: Option[Array[Byte]],
     options: Map[String, String] = Map.empty): Column = {
-    descFilePathOpt match {
-      case Some(descFilePath) => functions.from_protobuf(
-        col, messageName, descFilePath, options.asJava
+    descBytesOpt match {
+      case Some(descBytes) => functions.from_protobuf(
+        col, messageName, descBytes, options.asJava
       )
       case None => functions.from_protobuf(col, messageName, options.asJava)
     }
@@ -87,9 +89,9 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
 
   // A wrapper to invoke the right variable of to_protobuf() depending on 
arguments.
   private def to_protobuf_wrapper(
-    col: Column, messageName: String, descFilePathOpt: Option[String]): Column 
= {
-    descFilePathOpt match {
-      case Some(descFilePath) => functions.to_protobuf(col, messageName, 
descFilePath)
+    col: Column, messageName: String, descBytesOpt: Option[Array[Byte]]): 
Column = {
+    descBytesOpt match {
+      case Some(descBytes) => functions.to_protobuf(col, messageName, 
descBytes)
       case None => functions.to_protobuf(col, messageName)
     }
   }
@@ -116,11 +118,11 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
         lit("0".getBytes).as("bytes_value")).as("SimpleMessage"))
 
     checkWithFileAndClassName("SimpleMessage") {
-      case (name, descFilePathOpt) =>
+      case (name, descBytesOpt) =>
         val protoStructDF = df.select(
-          to_protobuf_wrapper($"SimpleMessage", name, 
descFilePathOpt).as("proto"))
+          to_protobuf_wrapper($"SimpleMessage", name, 
descBytesOpt).as("proto"))
         val actualDf = protoStructDF.select(
-          from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*"))
+          from_protobuf_wrapper($"proto", name, descBytesOpt).as("proto.*"))
         checkAnswer(actualDf, df)
     }
   }
@@ -460,9 +462,11 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
   }
 
   test("Handle extra fields : oldProducer -> newConsumer") {
-    val testFileDesc = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
-    val oldProducer = ProtobufUtils.buildDescriptor(testFileDesc, 
"oldProducer")
-    val newConsumer = ProtobufUtils.buildDescriptor(testFileDesc, 
"newConsumer")
+    val catalystTypesFile = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
+    val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile)
+
+    val oldProducer = ProtobufUtils.buildDescriptor(descBytes, "oldProducer")
+    val newConsumer = ProtobufUtils.buildDescriptor(descBytes, "newConsumer")
 
     val oldProducerMessage = DynamicMessage
       .newBuilder(oldProducer)
@@ -472,17 +476,17 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     val df = Seq(oldProducerMessage.toByteArray).toDF("oldProducerData")
     val fromProtoDf = df.select(
       functions
-        .from_protobuf($"oldProducerData", "newConsumer", testFileDesc)
+        .from_protobuf($"oldProducerData", "newConsumer", catalystTypesFile)
         .as("fromProto"))
 
     val toProtoDf = fromProtoDf.select(
       functions
-        .to_protobuf($"fromProto", "newConsumer", testFileDesc)
+        .to_protobuf($"fromProto", "newConsumer", descBytes)
         .as("toProto"))
 
     val toProtoDfToFromProtoDf = toProtoDf.select(
       functions
-        .from_protobuf($"toProto", "newConsumer", testFileDesc)
+        .from_protobuf($"toProto", "newConsumer", descBytes)
         .as("toProtoToFromProto"))
 
     val actualFieldNames =
@@ -500,9 +504,11 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
   }
 
   test("Handle extra fields : newProducer -> oldConsumer") {
-    val testFileDesc = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
-    val newProducer = ProtobufUtils.buildDescriptor(testFileDesc, 
"newProducer")
-    val oldConsumer = ProtobufUtils.buildDescriptor(testFileDesc, 
"oldConsumer")
+    val catalystTypesFile = testFile("catalyst_types.desc", 
"protobuf/catalyst_types.desc")
+    val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile)
+
+    val newProducer = ProtobufUtils.buildDescriptor(descBytes, "newProducer")
+    val oldConsumer = ProtobufUtils.buildDescriptor(descBytes, "oldConsumer")
 
     val newProducerMessage = DynamicMessage
       .newBuilder(newProducer)
@@ -513,7 +519,7 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     val df = Seq(newProducerMessage.toByteArray).toDF("newProducerData")
     val fromProtoDf = df.select(
       functions
-        .from_protobuf($"newProducerData", "oldConsumer", testFileDesc)
+        .from_protobuf($"newProducerData", "oldConsumer", catalystTypesFile)
         .as("oldConsumerProto"))
 
     val expectedFieldNames = oldConsumer.getFields.asScala.map(f => f.getName)
@@ -691,8 +697,8 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     }
     checkError(
       exception = e,
-      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
-      parameters = Map("descFilePath" -> testFileDescriptor))
+      errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND",
+      parameters = Map("dependencyName" -> "nestedenum.proto"))
   }
 
   test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> 
from_protobuf") {
@@ -1042,7 +1048,6 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
       parameters = Map("filePath" -> "/non/existent/path.desc")
     )
     assert(ex.getCause != null)
-    assert(ex.getCause.getMessage.matches(".*No such file.*"), 
ex.getCause.getMessage())
   }
 
   test("Recursive fields in arrays and maps") {
@@ -1432,16 +1437,16 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
       ).as("proto")
     )
 
-    checkWithProto2FileAndClassName("Proto2AllTypes") { case (name, 
descFilePathOpt) =>
+    checkWithProto2FileAndClassName("Proto2AllTypes") { case (name, 
descBytesOpt) =>
       checkAnswer(
         explicitZero.select(
-          from_protobuf_wrapper($"raw_proto", name, 
descFilePathOpt).as("proto")),
+          from_protobuf_wrapper($"raw_proto", name, descBytesOpt).as("proto")),
         expected)
       checkAnswer(
         explicitZero.select(from_protobuf_wrapper(
           $"raw_proto",
           name,
-          descFilePathOpt,
+          descBytesOpt,
           Map("emit.default.values" -> "true")).as("proto")),
         expected)
     }
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
index 356cd20eb4e..8e9ff92621c 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala
@@ -36,10 +36,13 @@ class ProtobufSerdeSuite extends SharedSparkSession with 
ProtobufTestBase {
   import ProtoSerdeSuite._
   import ProtoSerdeSuite.MatchType._
 
-  val testFileDesc = testFile("serde_suite.desc", "protobuf/serde_suite.desc")
+  private val testFileDescFile = testFile("serde_suite.desc", 
"protobuf/serde_suite.desc")
+  private val testFileDesc = 
ProtobufUtils.readDescriptorFileContent(testFileDescFile)
+
   private val javaClassNamePrefix = 
"org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$"
 
-  val proto2Desc = testFile("proto2_messages.desc", 
"protobuf/proto2_messages.desc")
+  private val proto2DescFile = testFile("proto2_messages.desc", 
"protobuf/proto2_messages.desc")
+  private val proto2Desc = 
ProtobufUtils.readDescriptorFileContent(proto2DescFile)
 
   test("Test basic conversion") {
     withFieldMatchType { fieldMatch =>
@@ -209,25 +212,32 @@ class ProtobufSerdeSuite extends SharedSparkSession with 
ProtobufTestBase {
 
   test("raise cannot parse and construct protobuf descriptor error") {
     // passing serde_suite.proto instead serde_suite.desc
-    var testFileDesc = testFile("serde_suite.proto", 
"protobuf/serde_suite.proto")
+    var fileDescFile = testFile("serde_suite.proto", 
"protobuf/serde_suite.proto")
+
     val e1 = intercept[AnalysisException] {
-      ProtobufUtils.buildDescriptor(testFileDesc, "SerdeBasicMessage")
+      ProtobufUtils.buildDescriptor(
+        ProtobufUtils.readDescriptorFileContent(fileDescFile),
+        "SerdeBasicMessage"
+      )
     }
 
     checkError(
       exception = e1,
-      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
-      parameters = Map("descFilePath" -> testFileDesc))
+      errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR")
+
+    fileDescFile =
+      testFile("basicmessage_noimports.desc", 
"protobuf/basicmessage_noimports.desc")
 
-    testFileDesc = testFile("basicmessage_noimports.desc", 
"protobuf/basicmessage_noimports.desc")
     val e2 = intercept[AnalysisException] {
-      ProtobufUtils.buildDescriptor(testFileDesc, "SerdeBasicMessage")
+      ProtobufUtils.buildDescriptor(
+        ProtobufUtils.readDescriptorFileContent(fileDescFile),
+        "SerdeBasicMessage")
     }
 
     checkError(
       exception = e2,
-      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
-      parameters = Map("descFilePath" -> testFileDesc))
+      errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND",
+      parameters = Map("dependencyName" -> "nestedenum.proto"))
   }
 
   /**
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index f69832efe35..169ef86ad53 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -54,11 +54,6 @@
     ],
     "sqlState" : "42846"
   },
-  "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : {
-    "message" : [
-      "Error constructing FileDescriptor for <descFilePath>."
-    ]
-  },
   "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : {
     "message" : [
       "Cannot convert Protobuf <protobufColumn> to SQL <sqlColumn> because 
schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)."
@@ -132,7 +127,7 @@
   },
   "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : {
     "message" : [
-      "Error parsing file <descFilePath> descriptor byte[] into Descriptor 
object."
+      "Error parsing descriptor bytes into Protobuf FileDescriptorSet."
     ]
   },
   "CANNOT_PARSE_TIMESTAMP" : {
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index b9e9a2ed997..13068729539 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -59,12 +59,6 @@ Ambiguous reference to the field `<field>`. It appears 
`<count>` times in the sc
 
 Cannot cast `<sourceType>` to `<targetType>`.
 
-### CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR
-
-SQLSTATE: none assigned
-
-Error constructing FileDescriptor for `<descFilePath>`.
-
 ### CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE
 
 SQLSTATE: none assigned
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 4b004eb8fd1..476149f42d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3363,10 +3363,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("messageName" -> messageName))
   }
 
-  def descriptorParseError(descFilePath: String, cause: Throwable): Throwable 
= {
+  def descriptorParseError(cause: Throwable): Throwable = {
     new AnalysisException(
       errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR",
-      messageParameters = Map("descFilePath" -> descFilePath),
+      messageParameters = Map.empty,
       cause = Option(cause))
   }
 
@@ -3377,13 +3377,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       cause = Option(cause))
   }
 
-  def failedParsingDescriptorError(descFilePath: String, cause: Throwable): 
Throwable = {
-    new AnalysisException(
-      errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR",
-      messageParameters = Map("descFilePath" -> descFilePath),
-      cause = Option(cause))
-  }
-
   def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
     new AnalysisException(
       errorClass = "RECURSIVE_PROTOBUF_SCHEMA",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to