This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 64642b48351 [SPARK-43530][PROTOBUF] Read descriptor file only once 64642b48351 is described below commit 64642b48351c0c4ef8f40ce7902b85b6f953bd8f Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Sat May 27 23:18:54 2023 +0800 [SPARK-43530][PROTOBUF] Read descriptor file only once ### What changes were proposed in this pull request? Protobuf functions (`from_protobuf()` & `to_protobuf()`) take file path of a descriptor file and use that for constructing Protobuf descriptors. Main problem with how this is that the file is read many times (e.g. at each executor). This is unnecessary and error prone. E.g. file contents may be updated couple of days after a streaming query starts. That could lead to various errors. **The fix**: Use the byte content (which is serialized `FileDescritptorSet` proto). We read the content from the file once and carry the byte buffer. This also adds new API where we can pass the byte buffer directly. This is useful when the users fetch the content themselves and passes it to Protobuf functions. E.g. they could fetch it from S3, or extract it Python Protobuf classes. **Note to reviewers**: This includes a lot of updates to test files, mainly because the interface change to pass the buffer. I have left a few PR comments to help with the review. ### Why are the changes needed? Described above. ### Does this PR introduce _any_ user-facing change? Yes, this adds two new versions for `from_protobuf()` and `to_protobuf()` API that take Protobuf bytes rather than file path. ### How was this patch tested? - Unit tests Closes #41192 from rangadi/proto-file-buffer. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../org/apache/spark/sql/protobuf/functions.scala | 135 +++++++++++++++++++-- .../org/apache/spark/sql/FunctionTestSuite.scala | 12 +- .../apache/spark/sql/PlanGenerationTestSuite.scala | 7 +- ..._protobuf_messageClassName_descFilePath.explain | 2 +- ...f_messageClassName_descFilePath_options.explain | 2 +- ..._protobuf_messageClassName_descFilePath.explain | 2 +- ...f_messageClassName_descFilePath_options.explain | 2 +- ...rom_protobuf_messageClassName_descFilePath.json | 2 +- ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 156 -> 361 bytes ...obuf_messageClassName_descFilePath_options.json | 2 +- ...messageClassName_descFilePath_options.proto.bin | Bin 206 -> 409 bytes .../to_protobuf_messageClassName_descFilePath.json | 2 +- ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 154 -> 359 bytes ...obuf_messageClassName_descFilePath_options.json | 2 +- ...messageClassName_descFilePath_options.proto.bin | Bin 204 -> 407 bytes .../sql/connect/planner/SparkConnectPlanner.scala | 33 ++--- .../sql/connect/ProtoToParsedPlanTestSuite.scala | 4 +- .../sql/protobuf/CatalystDataToProtobuf.scala | 7 +- .../sql/protobuf/ProtobufDataToCatalyst.scala | 14 +-- .../org/apache/spark/sql/protobuf/functions.scala | 114 +++++++++++++++-- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 67 +++++----- .../ProtobufCatalystDataConversionSuite.scala | 29 ++--- .../sql/protobuf/ProtobufFunctionsSuite.scala | 65 +++++----- .../spark/sql/protobuf/ProtobufSerdeSuite.scala | 30 +++-- core/src/main/resources/error/error-classes.json | 7 +- docs/sql-error-conditions.md | 6 - .../spark/sql/errors/QueryCompilationErrors.scala | 11 +- 27 files changed, 393 insertions(+), 164 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala index c42f8417155..57ce013065e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala @@ -16,10 +16,19 @@ */ package org.apache.spark.sql.protobuf +import java.io.File +import java.io.FileNotFoundException +import java.nio.file.NoSuchFileException +import java.util.Collections + import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.commons.io.FileUtils import org.apache.spark.annotation.Experimental import org.apache.spark.sql.Column +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.functions.{fnWithOptions, lit} // scalastyle:off: object.name @@ -35,7 +44,8 @@ object functions { * @param messageName * the protobuf message name to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @param options * @since 3.5.0 */ @@ -45,12 +55,36 @@ object functions { messageName: String, descFilePath: String, options: java.util.Map[String, String]): Column = { + val binaryFileDescSet = readDescriptorFileContent(descFilePath) + from_protobuf(data, messageName, binaryFileDescSet, options) + } + + /** + * Converts a binary column of Protobuf format into its corresponding catalyst value.The + * Protobuf definition is provided through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @param options + * @since 3.5.0 + */ + @Experimental + def from_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte], + options: java.util.Map[String, String]): Column = { fnWithOptions( "from_protobuf", options.asScala.iterator, data, lit(messageName), - lit(descFilePath)) + lit(binaryFileDescriptorSet)) } /** @@ -62,12 +96,13 @@ object functions { * @param messageName * the protobuf MessageName to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @since 3.5.0 */ @Experimental def from_protobuf(data: Column, messageName: String, descFilePath: String): Column = { - Column.fn("from_protobuf", data, lit(messageName), lit(descFilePath)) + from_protobuf(data, messageName, descFilePath, emptyOptions) } /** @@ -90,6 +125,27 @@ object functions { Column.fn("from_protobuf", data, lit(messageClassName)) } + /** + * Converts a binary column of Protobuf format into its corresponding catalyst value.The + * Protobuf definition is provided through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @since 3.5.0 + */ + @Experimental + def from_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte]): Column = { + from_protobuf(data, messageName, binaryFileDescriptorSet, emptyOptions) + } + /** * Converts a binary column of Protobuf format into its corresponding catalyst value. * `messageClassName` points to Protobuf Java class. The jar containing Java class should be @@ -123,12 +179,35 @@ object functions { * @param messageName * the protobuf MessageName to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @since 3.5.0 */ @Experimental def to_protobuf(data: Column, messageName: String, descFilePath: String): Column = { - Column.fn("to_protobuf", data, lit(messageName), lit(descFilePath)) + to_protobuf(data, messageName, descFilePath, emptyOptions) + } + + /** + * Converts a column into binary of protobuf format.The Protobuf definition is provided through + * Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * + * @since 3.5.0 + */ + @Experimental + def to_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte]): Column = { + to_protobuf(data, messageName, binaryFileDescriptorSet, emptyOptions) } /** @@ -140,7 +219,8 @@ object functions { * @param messageName * the protobuf MessageName to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @param options * @since 3.5.0 */ @@ -150,12 +230,36 @@ object functions { messageName: String, descFilePath: String, options: java.util.Map[String, String]): Column = { + val binaryFileDescriptorSet = readDescriptorFileContent(descFilePath) + to_protobuf(data, messageName, binaryFileDescriptorSet, options) + } + + /** + * Converts a column into binary of protobuf format.The Protobuf definition is provided through + * Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @param options + * @since 3.5.0 + */ + @Experimental + def to_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte], + options: java.util.Map[String, String]): Column = { fnWithOptions( "to_protobuf", options.asScala.iterator, data, lit(messageName), - lit(descFilePath)) + lit(binaryFileDescriptorSet)) } /** @@ -199,4 +303,19 @@ object functions { options: java.util.Map[String, String]): Column = { fnWithOptions("to_protobuf", options.asScala.iterator, data, lit(messageClassName)) } + + private def emptyOptions: java.util.Map[String, String] = Collections.emptyMap[String, String]() + + // This method is copied from org.apache.spark.sql.protobuf.util.ProtobufUtils + private def readDescriptorFileContent(filePath: String): Array[Byte] = { + try { + FileUtils.readFileToByteArray(new File(filePath)) + } catch { + case ex: FileNotFoundException => + throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, ex) + case ex: NoSuchFileException => + throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, ex) + case NonFatal(ex) => throw QueryCompilationErrors.descriptorParseError(ex) + } + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala index c92f77e8144..32004b6bcc1 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala @@ -238,12 +238,16 @@ class FunctionTestSuite extends ConnectFunSuite { Collections.emptyMap[String, String])) testEquals( "from_protobuf", - pbFn.from_protobuf(a, "FakeMessage", "fakePath", Map.empty[String, String].asJava), - pbFn.from_protobuf(a, "FakeMessage", "fakePath")) + pbFn.from_protobuf( + a, + "FakeMessage", + "fakeBytes".getBytes(), + Map.empty[String, String].asJava), + pbFn.from_protobuf(a, "FakeMessage", "fakeBytes".getBytes())) testEquals( "to_protobuf", - pbFn.to_protobuf(a, "FakeMessage", "fakePath", Map.empty[String, String].asJava), - pbFn.to_protobuf(a, "FakeMessage", "fakePath")) + pbFn.to_protobuf(a, "FakeMessage", "fakeBytes".getBytes(), Map.empty[String, String].asJava), + pbFn.to_protobuf(a, "FakeMessage", "fakeBytes".getBytes())) test("assert_true no message") { val e = assert_true(a).expr diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 94b9adda655..607db2ee086 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder import org.apache.spark.sql.connect.client.SparkConnectClient import org.apache.spark.sql.connect.client.util.ConnectFunSuite +import org.apache.spark.sql.connect.client.util.IntegrationTestUtils import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.lit import org.apache.spark.sql.protobuf.{functions => pbFn} @@ -2255,10 +2256,8 @@ class PlanGenerationTestSuite // 1. cd connector/connect/common/src/main/protobuf/spark/connect // 2. protoc --include_imports --descriptor_set_out=../../../../test/resources/protobuf-tests/common.desc common.proto // scalastyle:on line.size.limit - private val testDescFilePath: String = java.nio.file.Paths - .get("../", "common", "src", "test", "resources", "protobuf-tests") - .resolve("common.desc") - .toString + private val testDescFilePath: String = s"${IntegrationTestUtils.sparkHome}/connector/" + + "connect/common/src/test/resources/protobuf-tests/common.desc" test("from_protobuf messageClassName") { binary.select(pbFn.from_protobuf(fn.col("bytes"), classOf[StorageLevel].getName)) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain index b3eaa5f44d6..6eb4805b4fc 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath.explain @@ -1,2 +1,2 @@ -Project [from_protobuf(bytes#0, StorageLevel, Some(../common/src/test/resources/protobuf-tests/common.desc)) AS from_protobuf(bytes)#0] +Project [from_protobuf(bytes#0, StorageLevel, Some([B)) AS from_protobuf(bytes)#0] +- LocalRelation <empty>, [id#0L, bytes#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain index 85851e14da3..c4a47b1aef0 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/from_protobuf_messageClassName_descFilePath_options.explain @@ -1,2 +1,2 @@ -Project [from_protobuf(bytes#0, StorageLevel, Some(../common/src/test/resources/protobuf-tests/common.desc), (recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0] +Project [from_protobuf(bytes#0, StorageLevel, Some([B), (recursive.fields.max.depth,2)) AS from_protobuf(bytes)#0] +- LocalRelation <empty>, [id#0L, bytes#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain index 55b53cfcdb1..7c688cc4469 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath.explain @@ -1,2 +1,2 @@ -Project [to_protobuf(bytes#0, StorageLevel, Some(../common/src/test/resources/protobuf-tests/common.desc)) AS to_protobuf(bytes)#0] +Project [to_protobuf(bytes#0, StorageLevel, Some([B)) AS to_protobuf(bytes)#0] +- LocalRelation <empty>, [id#0L, bytes#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain index eafd4124d8f..9f05bb03c9c 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to_protobuf_messageClassName_descFilePath_options.explain @@ -1,2 +1,2 @@ -Project [to_protobuf(bytes#0, StorageLevel, Some(../common/src/test/resources/protobuf-tests/common.desc), (recursive.fields.max.depth,2)) AS to_protobuf(bytes)#0] +Project [to_protobuf(bytes#0, StorageLevel, Some([B), (recursive.fields.max.depth,2)) AS to_protobuf(bytes)#0] +- LocalRelation <empty>, [id#0L, bytes#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json index 05e210434cd..375c0f9324c 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.json @@ -24,7 +24,7 @@ } }, { "literal": { - "string": "../common/src/test/resources/protobuf-tests/common.desc" + "binary": "CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z" } }] } diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin index c4c2b2126b8..07d4c6c5b28 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json index ff747ede6f1..db9371b64ef 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.json @@ -24,7 +24,7 @@ } }, { "literal": { - "string": "../common/src/test/resources/protobuf-tests/common.desc" + "binary": "CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z" } }, { "unresolvedFunction": { diff --git a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin index 640df1c8d1b..00fd58da6be 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/from_protobuf_messageClassName_descFilePath_options.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json index e5bbb2ba57d..0843b469384 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.json @@ -24,7 +24,7 @@ } }, { "literal": { - "string": "../common/src/test/resources/protobuf-tests/common.desc" + "binary": "CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z" } }] } diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin index 346fc3d90f3..c3fe14aef47 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json index 183c8a25ce1..76307b3141f 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.json @@ -24,7 +24,7 @@ } }, { "literal": { - "string": "../common/src/test/resources/protobuf-tests/common.desc" + "binary": "CvwBCgxjb21tb24ucHJvdG8SDXNwYXJrLmNvbm5lY3QisAEKDFN0b3JhZ2VMZXZlbBIZCgh1c2VfZGlzaxgBIAEoCFIHdXNlRGlzaxIdCgp1c2VfbWVtb3J5GAIgASgIUgl1c2VNZW1vcnkSIAoMdXNlX29mZl9oZWFwGAMgASgIUgp1c2VPZmZIZWFwEiIKDGRlc2VyaWFsaXplZBgEIAEoCFIMZGVzZXJpYWxpemVkEiAKC3JlcGxpY2F0aW9uGAUgASgFUgtyZXBsaWNhdGlvbkIiCh5vcmcuYXBhY2hlLnNwYXJrLmNvbm5lY3QucHJvdG9QAWIGcHJvdG8z" } }, { "unresolvedFunction": { diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin index cc4d08ee581..a387611c1ad 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/to_protobuf_messageClassName_descFilePath_options.proto.bin differ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index be13578d19c..10528843e40 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1451,32 +1451,33 @@ class SparkConnectPlanner(val session: SparkSession) { def extractArgsOfProtobufFunction( functionName: String, argumentsCount: Int, - children: Seq[Expression]): (String, Option[String], Map[String, String]) = { + children: Seq[Expression]): (String, Option[Array[Byte]], Map[String, String]) = { val messageClassName = children(1) match { case Literal(s, StringType) if s != null => s.toString case other => throw InvalidPlanInput( s"MessageClassName in $functionName should be a literal string, but got $other") } - val (descFilePathOpt, options) = if (argumentsCount == 2) { + val (binaryFileDescSetOpt, options) = if (argumentsCount == 2) { (None, Map.empty[String, String]) } else if (argumentsCount == 3) { children(2) match { - case Literal(s, StringType) if s != null => - (Some(s.toString), Map.empty[String, String]) + case Literal(b, BinaryType) if b != null => + (Some(b.asInstanceOf[Array[Byte]]), Map.empty[String, String]) case UnresolvedFunction(Seq("map"), arguments, _, _, _) => (None, ExprUtils.convertToMapData(CreateMap(arguments))) case other => throw InvalidPlanInput( - s"The valid type for the 3rd arg in $functionName is string or map, but got $other") + s"The valid type for the 3rd arg in $functionName " + + s"is binary or map, but got $other") } } else if (argumentsCount == 4) { - val filePathOpt = children(2) match { - case Literal(s, StringType) if s != null => - Some(s.toString) + val fileDescSetOpt = children(2) match { + case Literal(b, BinaryType) if b != null => + Some(b.asInstanceOf[Array[Byte]]) case other => throw InvalidPlanInput( - s"DescFilePath in $functionName should be a literal string, but got $other") + s"DescFilePath in $functionName should be a literal binary, but got $other") } val map = children(3) match { case UnresolvedFunction(Seq("map"), arguments, _, _, _) => @@ -1485,12 +1486,12 @@ class SparkConnectPlanner(val session: SparkSession) { throw InvalidPlanInput( s"Options in $functionName should be created by map, but got $other") } - (filePathOpt, map) + (fileDescSetOpt, map) } else { throw InvalidPlanInput( s"$functionName requires 2 ~ 4 arguments, but got $argumentsCount ones!") } - (messageClassName, descFilePathOpt, options) + (messageClassName, binaryFileDescSetOpt, options) } fun.getFunctionName match { @@ -1648,15 +1649,17 @@ class SparkConnectPlanner(val session: SparkSession) { // Protobuf-specific functions case "from_protobuf" if Seq(2, 3, 4).contains(fun.getArgumentsCount) => val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) - val (messageClassName, descFilePathOpt, options) = + val (messageClassName, binaryFileDescSetOpt, options) = extractArgsOfProtobufFunction("from_protobuf", fun.getArgumentsCount, children) - Some(ProtobufDataToCatalyst(children.head, messageClassName, descFilePathOpt, options)) + Some( + ProtobufDataToCatalyst(children.head, messageClassName, binaryFileDescSetOpt, options)) case "to_protobuf" if Seq(2, 3, 4).contains(fun.getArgumentsCount) => val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) - val (messageClassName, descFilePathOpt, options) = + val (messageClassName, binaryFileDescSetOpt, options) = extractArgsOfProtobufFunction("to_protobuf", fun.getArgumentsCount, children) - Some(CatalystDataToProtobuf(children.head, messageClassName, descFilePathOpt, options)) + Some( + CatalystDataToProtobuf(children.head, messageClassName, binaryFileDescSetOpt, options)) case _ => None } diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index 64cbfbb6952..1d8e8446c45 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -195,7 +195,9 @@ class ProtoToParsedPlanTestSuite } private def removeMemoryAddress(expr: String): String = { - expr.replaceAll("@[0-9a-f]+,", ",") + expr + .replaceAll("@[0-9a-f]+,", ",") + .replaceAll("@[0-9a-f]+\\)", ")") } private def readRelation(path: Path): proto.Relation = { diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala index 62d251fb869..8805d935093 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/CatalystDataToProtobuf.scala @@ -26,14 +26,17 @@ import org.apache.spark.sql.types.{BinaryType, DataType} private[sql] case class CatalystDataToProtobuf( child: Expression, messageName: String, - descFilePath: Option[String] = None, + binaryFileDescriptorSet: Option[Array[Byte]] = None, options: Map[String, String] = Map.empty) extends UnaryExpression { + // TODO(SPARK-43578): binaryFileDescriptorSet could be very large in some cases. It is better + // to broadcast it so that it is not transferred with each task. + override def dataType: DataType = BinaryType @transient private lazy val protoDescriptor = - ProtobufUtils.buildDescriptor(messageName, descFilePathOpt = descFilePath) + ProtobufUtils.buildDescriptor(messageName, binaryFileDescriptorSet) @transient private lazy val serializer = new ProtobufSerializer(child.dataType, protoDescriptor, child.nullable) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index 128169f46a2..5c4a5ff0689 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, Struc private[sql] case class ProtobufDataToCatalyst( child: Expression, messageName: String, - descFilePath: Option[String] = None, + binaryFileDescriptorSet: Option[Array[Byte]] = None, options: Map[String, String] = Map.empty) extends UnaryExpression with ExpectsInputTypes { @@ -55,19 +55,15 @@ private[sql] case class ProtobufDataToCatalyst( private lazy val protobufOptions = ProtobufOptions(options) @transient private lazy val messageDescriptor = - ProtobufUtils.buildDescriptor(messageName, descFilePath) - // TODO: Avoid carrying the file name. Read the contents of descriptor file only once - // at the start. Rest of the runs should reuse the buffer. Otherwise, it could - // cause inconsistencies if the file contents are changed the user after a few days. - // Same for the write side in [[CatalystDataToProtobuf]]. + ProtobufUtils.buildDescriptor(messageName, binaryFileDescriptorSet) @transient private lazy val fieldsNumbers = messageDescriptor.getFields.asScala.map(f => f.getNumber).toSet @transient private lazy val deserializer = { - val typeRegistry = descFilePath match { - case Some(path) if protobufOptions.convertAnyFieldsToJson => - ProtobufUtils.buildTypeRegistry(path) // This loads all the messages in the file. + val typeRegistry = binaryFileDescriptorSet match { + case Some(descBytes) if protobufOptions.convertAnyFieldsToJson => + ProtobufUtils.buildTypeRegistry(descBytes) // This loads all the messages in the desc set. case None if protobufOptions.convertAnyFieldsToJson => ProtobufUtils.buildTypeRegistry(messageDescriptor) // Loads only connected messages. case _ => TypeRegistry.getEmptyTypeRegistry // Default. Json conversion is not enabled. diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala index 8056082c66f..6a33dfa1da1 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/functions.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.Column +import org.apache.spark.sql.protobuf.utils.ProtobufUtils // scalastyle:off: object.name object functions { @@ -34,7 +35,8 @@ object functions { * @param messageName * the protobuf message name to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @param options * @since 3.4.0 */ @@ -44,8 +46,34 @@ object functions { messageName: String, descFilePath: String, options: java.util.Map[String, String]): Column = { + val descriptorFileContent = ProtobufUtils.readDescriptorFileContent(descFilePath) + from_protobuf(data, messageName, descriptorFileContent, options) + } + + /** + * Converts a binary column of Protobuf format into its corresponding catalyst value.The + * Protobuf definition is provided through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @param options + * @since 3.5.0 + */ + @Experimental + def from_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte], + options: java.util.Map[String, String]): Column = { new Column( - ProtobufDataToCatalyst(data.expr, messageName, Some(descFilePath), options.asScala.toMap) + ProtobufDataToCatalyst( + data.expr, messageName, Some(binaryFileDescriptorSet), options.asScala.toMap + ) ) } @@ -58,14 +86,33 @@ object functions { * @param messageName * the protobuf MessageName to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @since 3.4.0 */ @Experimental def from_protobuf(data: Column, messageName: String, descFilePath: String): Column = { - new Column(ProtobufDataToCatalyst(data.expr, messageName, descFilePath = Some(descFilePath))) - // TODO: Add an option for user to provide descriptor file content as a buffer. This - // gives flexibility in how the content is fetched. + val fileContent = ProtobufUtils.readDescriptorFileContent(descFilePath) + new Column(ProtobufDataToCatalyst(data.expr, messageName, Some(fileContent))) + } + + /** + * Converts a binary column of Protobuf format into its corresponding catalyst value.The + * Protobuf definition is provided through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @since 3.5.0 + */ + @Experimental + def from_protobuf(data: Column, messageName: String, binaryFileDescriptorSet: Array[Byte]) + : Column = { + new Column(ProtobufDataToCatalyst(data.expr, messageName, Some(binaryFileDescriptorSet))) } /** @@ -121,14 +168,34 @@ object functions { * @param messageName * the protobuf MessageName to look for in descriptor file. * @param descFilePath - * the protobuf descriptor file. + * The Protobuf descriptor file. This file is usually created using `protoc` with + * `--descriptor_set_out` and `--include_imports` options. * @since 3.4.0 */ @Experimental def to_protobuf(data: Column, messageName: String, descFilePath: String): Column = { - new Column(CatalystDataToProtobuf(data.expr, messageName, Some(descFilePath))) + to_protobuf(data, messageName, descFilePath, Map.empty[String, String].asJava) } + /** + * Converts a column into binary of protobuf format.The Protobuf definition is provided + * through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * + * @since 3.5.0 + */ + @Experimental + def to_protobuf(data: Column, messageName: String, binaryFileDescriptorSet: Array[Byte]) + : Column = { + new Column(CatalystDataToProtobuf(data.expr, messageName, Some(binaryFileDescriptorSet))) + } /** * Converts a column into binary of protobuf format. The Protobuf definition is provided * through Protobuf <i>descriptor file</i>. @@ -148,8 +215,37 @@ object functions { messageName: String, descFilePath: String, options: java.util.Map[String, String]): Column = { + val fileContent = ProtobufUtils.readDescriptorFileContent(descFilePath) + new Column( + CatalystDataToProtobuf(data.expr, messageName, Some(fileContent), options.asScala.toMap) + ) + } + + /** + * Converts a column into binary of protobuf format.The Protobuf definition is provided + * through Protobuf `FileDescriptorSet`. + * + * @param data + * the binary column. + * @param messageName + * the protobuf MessageName to look for in the descriptor set. + * @param binaryFileDescriptorSet + * Serialized Protobuf descriptor (`FileDescriptorSet`). Typically contents of file created + * using `protoc` with `--descriptor_set_out` and `--include_imports` options. + * @param options + * @since 3.5.0 + */ + @Experimental + def to_protobuf( + data: Column, + messageName: String, + binaryFileDescriptorSet: Array[Byte], + options: java.util.Map[String, String] + ): Column = { new Column( - CatalystDataToProtobuf(data.expr, messageName, Some(descFilePath), options.asScala.toMap) + CatalystDataToProtobuf( + data.expr, messageName, Some(binaryFileDescriptorSet), options.asScala.toMap + ) ) } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index 5688a483da0..6f49abfd208 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.protobuf.utils -import java.io.{BufferedInputStream, FileInputStream, IOException} +import java.io.File +import java.io.FileNotFoundException +import java.nio.file.NoSuchFileException import java.util.Locale import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException, Message} import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet} import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} import com.google.protobuf.TypeRegistry +import org.apache.commons.io.FileUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.errors.QueryCompilationErrors @@ -139,15 +143,16 @@ private[sql] object ProtobufUtils extends Logging { * Builds Protobuf message descriptor either from the Java class or from serialized descriptor * read from the file. * @param messageName - * Protobuf message name or Java class name. - * @param descFilePathOpt - * When the file name set, the descriptor and it's dependencies are read from the file. Other - * the `messageName` is treated as Java class name. + * Protobuf message name or Java class name (when binaryFileDescriptorSet is None).. + * @param binaryFileDescriptorSet + * When the binary `FileDescriptorSet` is provided, the descriptor and its dependencies are + * read from it. * @return */ - def buildDescriptor(messageName: String, descFilePathOpt: Option[String]): Descriptor = { - descFilePathOpt match { - case Some(filePath) => buildDescriptor(descFilePath = filePath, messageName) + def buildDescriptor(messageName: String, binaryFileDescriptorSet: Option[Array[Byte]]) + : Descriptor = { + binaryFileDescriptorSet match { + case Some(bytes) => buildDescriptor(bytes, messageName) case None => buildDescriptorFromJavaClass(messageName) } } @@ -208,9 +213,9 @@ private[sql] object ProtobufUtils extends Logging { .asInstanceOf[Descriptor] } - def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { + def buildDescriptor(binaryFileDescriptorSet: Array[Byte], messageName: String): Descriptor = { // Find the first message descriptor that matches the name. - val descriptorOpt = parseFileDescriptorSet(descFilePath) + val descriptorOpt = parseFileDescriptorSet(binaryFileDescriptorSet) .flatMap { fileDesc => fileDesc.getMessageTypes.asScala.find { desc => desc.getName == messageName || desc.getFullName == messageName @@ -223,28 +228,32 @@ private[sql] object ProtobufUtils extends Logging { } } - private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { - var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null + def readDescriptorFileContent(filePath: String): Array[Byte] = { try { - val dscFile = new BufferedInputStream(new FileInputStream(descFilePath)) - fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) + FileUtils.readFileToByteArray(new File(filePath)) } catch { - case ex: InvalidProtocolBufferException => - throw QueryCompilationErrors.descriptorParseError(descFilePath, ex) - case ex: IOException => - throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) + case ex: FileNotFoundException => + throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, ex) + case ex: NoSuchFileException => + throw QueryCompilationErrors.cannotFindDescriptorFileError(filePath, ex) + case NonFatal(ex) => throw QueryCompilationErrors.descriptorParseError(ex) } + } + + private def parseFileDescriptorSet(bytes: Array[Byte]): List[Descriptors.FileDescriptor] = { + var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null try { - val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) - val fileDescriptorList: List[Descriptors.FileDescriptor] = - fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto => - buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex) - ).toList - fileDescriptorList + fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(bytes) } catch { - case e: Exception => - throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e) + case ex: InvalidProtocolBufferException => + throw QueryCompilationErrors.descriptorParseError(ex) } + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptorList: List[Descriptors.FileDescriptor] = + fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto => + buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex) + ).toList + fileDescriptorList } /** @@ -285,10 +294,10 @@ private[sql] object ProtobufUtils extends Logging { case n => s"field '${n.mkString(".")}'" } - /** Builds [[TypeRegistry]] with all the messages found in the descriptor file. */ - private[protobuf] def buildTypeRegistry(descFilePath: String): TypeRegistry = { + /** Builds [[TypeRegistry]] with all the messages found in the descriptor set. */ + private[protobuf] def buildTypeRegistry(descriptorBytes: Array[Byte]): TypeRegistry = { val registryBuilder = TypeRegistry.newBuilder() - for (fileDesc <- parseFileDescriptorSet(descFilePath)) { + for (fileDesc <- parseFileDescriptorSet(descriptorBytes)) { registryBuilder.add(fileDesc.getMessageTypes) } registryBuilder.build() diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 3e9273835e3..fedb06103dc 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -37,7 +37,8 @@ class ProtobufCatalystDataConversionSuite with ExpressionEvalHelper with ProtobufTestBase { - private val testFileDesc = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") + private val testFileDescFile = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") + private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile) private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.CatalystTypes$" private def checkResultWithEval( @@ -45,22 +46,21 @@ class ProtobufCatalystDataConversionSuite descFilePath: String, messageName: String, expected: Any): Unit = { - + val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath) withClue("(Eval check with Java class name)") { val className = s"$javaClassNamePrefix$messageName" checkEvaluation( ProtobufDataToCatalyst( CatalystDataToProtobuf(data, className), - className, - descFilePath = None), + className), prepareExpectedResult(expected)) } withClue("(Eval check with descriptor file)") { checkEvaluation( ProtobufDataToCatalyst( - CatalystDataToProtobuf(data, messageName, Some(descFilePath)), + CatalystDataToProtobuf(data, messageName, Some(descBytes)), messageName, - descFilePath = Some(descFilePath)), + Some(descBytes)), prepareExpectedResult(expected)) } } @@ -71,15 +71,16 @@ class ProtobufCatalystDataConversionSuite actualSchema: String, badSchema: String): Unit = { - val binary = CatalystDataToProtobuf(data, actualSchema, Some(descFilePath)) + val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath) + val binary = CatalystDataToProtobuf(data, actualSchema, Some(descBytes)) intercept[Exception] { - ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" -> "FAILFAST")) + ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> "FAILFAST")) .eval() } val expected = { - val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, badSchema) + val expectedSchema = ProtobufUtils.buildDescriptor(descBytes, badSchema) SchemaConverters.toSqlType(expectedSchema).dataType match { case st: StructType => Row.fromSeq((0 until st.length).map { _ => @@ -90,7 +91,7 @@ class ProtobufCatalystDataConversionSuite } checkEvaluation( - ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" -> "PERMISSIVE")), + ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> "PERMISSIVE")), expected) } @@ -145,20 +146,20 @@ class ProtobufCatalystDataConversionSuite checkResultWithEval( input, - testFileDesc, + testFileDescFile, messageName, input.eval()) } } private def checkDeserialization( - descFilePath: String, + descFileBytes: Array[Byte], messageName: String, data: Message, expected: Option[Any], filters: StructFilters = new NoopFilters): Unit = { - val descriptor = ProtobufUtils.buildDescriptor(descFilePath, messageName) + val descriptor = ProtobufUtils.buildDescriptor(descFileBytes, messageName) val dataType = SchemaConverters.toSqlType(descriptor).dataType val deserializer = new ProtobufDeserializer(descriptor, dataType, filters) @@ -195,7 +196,7 @@ class ProtobufCatalystDataConversionSuite val data = RandomDataGenerator.randomRow(new scala.util.Random(seed), actualSchema) val converter = CatalystTypeConverters.createToCatalystConverter(actualSchema) val input = Literal.create(converter(data), actualSchema) - checkUnsupportedRead(input, testFileDesc, "Actual", "Bad") + checkUnsupportedRead(input, testFileDescFile, "Actual", "Bad") } } diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 7e6cf0a3c96..f0979209f86 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -40,10 +40,12 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot import testImplicits._ - val testFileDesc = testFile("functions_suite.desc", "protobuf/functions_suite.desc") + val testFileDescFile = testFile("functions_suite.desc", "protobuf/functions_suite.desc") + private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile) private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$" - val proto2FileDesc = testFile("proto2_messages.desc", "protobuf/proto2_messages.desc") + val proto2FileDescFile = testFile("proto2_messages.desc", "protobuf/proto2_messages.desc") + val proto2FileDesc = ProtobufUtils.readDescriptorFileContent(proto2FileDescFile) private val proto2JavaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.Proto2Messages$" private def emptyBinaryDF = Seq(Array[Byte]()).toDF("binary") @@ -52,7 +54,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot * Runs the given closure twice. Once with descriptor file and second time with Java class name. */ private def checkWithFileAndClassName(messageName: String)( - fn: (String, Option[String]) => Unit): Unit = { + fn: (String, Option[Array[Byte]]) => Unit): Unit = { withClue("(With descriptor file)") { fn(messageName, Some(testFileDesc)) } @@ -62,7 +64,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } private def checkWithProto2FileAndClassName(messageName: String)( - fn: (String, Option[String]) => Unit): Unit = { + fn: (String, Option[Array[Byte]]) => Unit): Unit = { withClue("(With descriptor file)") { fn(messageName, Some(proto2FileDesc)) } @@ -75,11 +77,11 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot private def from_protobuf_wrapper( col: Column, messageName: String, - descFilePathOpt: Option[String], + descBytesOpt: Option[Array[Byte]], options: Map[String, String] = Map.empty): Column = { - descFilePathOpt match { - case Some(descFilePath) => functions.from_protobuf( - col, messageName, descFilePath, options.asJava + descBytesOpt match { + case Some(descBytes) => functions.from_protobuf( + col, messageName, descBytes, options.asJava ) case None => functions.from_protobuf(col, messageName, options.asJava) } @@ -87,9 +89,9 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot // A wrapper to invoke the right variable of to_protobuf() depending on arguments. private def to_protobuf_wrapper( - col: Column, messageName: String, descFilePathOpt: Option[String]): Column = { - descFilePathOpt match { - case Some(descFilePath) => functions.to_protobuf(col, messageName, descFilePath) + col: Column, messageName: String, descBytesOpt: Option[Array[Byte]]): Column = { + descBytesOpt match { + case Some(descBytes) => functions.to_protobuf(col, messageName, descBytes) case None => functions.to_protobuf(col, messageName) } } @@ -116,11 +118,11 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot lit("0".getBytes).as("bytes_value")).as("SimpleMessage")) checkWithFileAndClassName("SimpleMessage") { - case (name, descFilePathOpt) => + case (name, descBytesOpt) => val protoStructDF = df.select( - to_protobuf_wrapper($"SimpleMessage", name, descFilePathOpt).as("proto")) + to_protobuf_wrapper($"SimpleMessage", name, descBytesOpt).as("proto")) val actualDf = protoStructDF.select( - from_protobuf_wrapper($"proto", name, descFilePathOpt).as("proto.*")) + from_protobuf_wrapper($"proto", name, descBytesOpt).as("proto.*")) checkAnswer(actualDf, df) } } @@ -460,9 +462,11 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } test("Handle extra fields : oldProducer -> newConsumer") { - val testFileDesc = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") - val oldProducer = ProtobufUtils.buildDescriptor(testFileDesc, "oldProducer") - val newConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "newConsumer") + val catalystTypesFile = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") + val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile) + + val oldProducer = ProtobufUtils.buildDescriptor(descBytes, "oldProducer") + val newConsumer = ProtobufUtils.buildDescriptor(descBytes, "newConsumer") val oldProducerMessage = DynamicMessage .newBuilder(oldProducer) @@ -472,17 +476,17 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val df = Seq(oldProducerMessage.toByteArray).toDF("oldProducerData") val fromProtoDf = df.select( functions - .from_protobuf($"oldProducerData", "newConsumer", testFileDesc) + .from_protobuf($"oldProducerData", "newConsumer", catalystTypesFile) .as("fromProto")) val toProtoDf = fromProtoDf.select( functions - .to_protobuf($"fromProto", "newConsumer", testFileDesc) + .to_protobuf($"fromProto", "newConsumer", descBytes) .as("toProto")) val toProtoDfToFromProtoDf = toProtoDf.select( functions - .from_protobuf($"toProto", "newConsumer", testFileDesc) + .from_protobuf($"toProto", "newConsumer", descBytes) .as("toProtoToFromProto")) val actualFieldNames = @@ -500,9 +504,11 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } test("Handle extra fields : newProducer -> oldConsumer") { - val testFileDesc = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") - val newProducer = ProtobufUtils.buildDescriptor(testFileDesc, "newProducer") - val oldConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "oldConsumer") + val catalystTypesFile = testFile("catalyst_types.desc", "protobuf/catalyst_types.desc") + val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile) + + val newProducer = ProtobufUtils.buildDescriptor(descBytes, "newProducer") + val oldConsumer = ProtobufUtils.buildDescriptor(descBytes, "oldConsumer") val newProducerMessage = DynamicMessage .newBuilder(newProducer) @@ -513,7 +519,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot val df = Seq(newProducerMessage.toByteArray).toDF("newProducerData") val fromProtoDf = df.select( functions - .from_protobuf($"newProducerData", "oldConsumer", testFileDesc) + .from_protobuf($"newProducerData", "oldConsumer", catalystTypesFile) .as("oldConsumerProto")) val expectedFieldNames = oldConsumer.getFields.asScala.map(f => f.getName) @@ -691,8 +697,8 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot } checkError( exception = e, - errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", - parameters = Map("descFilePath" -> testFileDescriptor)) + errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND", + parameters = Map("dependencyName" -> "nestedenum.proto")) } test("Verify OneOf field between from_protobuf -> to_protobuf and struct -> from_protobuf") { @@ -1042,7 +1048,6 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot parameters = Map("filePath" -> "/non/existent/path.desc") ) assert(ex.getCause != null) - assert(ex.getCause.getMessage.matches(".*No such file.*"), ex.getCause.getMessage()) } test("Recursive fields in arrays and maps") { @@ -1432,16 +1437,16 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot ).as("proto") ) - checkWithProto2FileAndClassName("Proto2AllTypes") { case (name, descFilePathOpt) => + checkWithProto2FileAndClassName("Proto2AllTypes") { case (name, descBytesOpt) => checkAnswer( explicitZero.select( - from_protobuf_wrapper($"raw_proto", name, descFilePathOpt).as("proto")), + from_protobuf_wrapper($"raw_proto", name, descBytesOpt).as("proto")), expected) checkAnswer( explicitZero.select(from_protobuf_wrapper( $"raw_proto", name, - descFilePathOpt, + descBytesOpt, Map("emit.default.values" -> "true")).as("proto")), expected) } diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala index 356cd20eb4e..8e9ff92621c 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala @@ -36,10 +36,13 @@ class ProtobufSerdeSuite extends SharedSparkSession with ProtobufTestBase { import ProtoSerdeSuite._ import ProtoSerdeSuite.MatchType._ - val testFileDesc = testFile("serde_suite.desc", "protobuf/serde_suite.desc") + private val testFileDescFile = testFile("serde_suite.desc", "protobuf/serde_suite.desc") + private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile) + private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$" - val proto2Desc = testFile("proto2_messages.desc", "protobuf/proto2_messages.desc") + private val proto2DescFile = testFile("proto2_messages.desc", "protobuf/proto2_messages.desc") + private val proto2Desc = ProtobufUtils.readDescriptorFileContent(proto2DescFile) test("Test basic conversion") { withFieldMatchType { fieldMatch => @@ -209,25 +212,32 @@ class ProtobufSerdeSuite extends SharedSparkSession with ProtobufTestBase { test("raise cannot parse and construct protobuf descriptor error") { // passing serde_suite.proto instead serde_suite.desc - var testFileDesc = testFile("serde_suite.proto", "protobuf/serde_suite.proto") + var fileDescFile = testFile("serde_suite.proto", "protobuf/serde_suite.proto") + val e1 = intercept[AnalysisException] { - ProtobufUtils.buildDescriptor(testFileDesc, "SerdeBasicMessage") + ProtobufUtils.buildDescriptor( + ProtobufUtils.readDescriptorFileContent(fileDescFile), + "SerdeBasicMessage" + ) } checkError( exception = e1, - errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", - parameters = Map("descFilePath" -> testFileDesc)) + errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR") + + fileDescFile = + testFile("basicmessage_noimports.desc", "protobuf/basicmessage_noimports.desc") - testFileDesc = testFile("basicmessage_noimports.desc", "protobuf/basicmessage_noimports.desc") val e2 = intercept[AnalysisException] { - ProtobufUtils.buildDescriptor(testFileDesc, "SerdeBasicMessage") + ProtobufUtils.buildDescriptor( + ProtobufUtils.readDescriptorFileContent(fileDescFile), + "SerdeBasicMessage") } checkError( exception = e2, - errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", - parameters = Map("descFilePath" -> testFileDesc)) + errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND", + parameters = Map("dependencyName" -> "nestedenum.proto")) } /** diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index f69832efe35..169ef86ad53 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -54,11 +54,6 @@ ], "sqlState" : "42846" }, - "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : { - "message" : [ - "Error constructing FileDescriptor for <descFilePath>." - ] - }, "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : { "message" : [ "Cannot convert Protobuf <protobufColumn> to SQL <sqlColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)." @@ -132,7 +127,7 @@ }, "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : { "message" : [ - "Error parsing file <descFilePath> descriptor byte[] into Descriptor object." + "Error parsing descriptor bytes into Protobuf FileDescriptorSet." ] }, "CANNOT_PARSE_TIMESTAMP" : { diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index b9e9a2ed997..13068729539 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -59,12 +59,6 @@ Ambiguous reference to the field `<field>`. It appears `<count>` times in the sc Cannot cast `<sourceType>` to `<targetType>`. -### CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR - -SQLSTATE: none assigned - -Error constructing FileDescriptor for `<descFilePath>`. - ### CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE SQLSTATE: none assigned diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 4b004eb8fd1..476149f42d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3363,10 +3363,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("messageName" -> messageName)) } - def descriptorParseError(descFilePath: String, cause: Throwable): Throwable = { + def descriptorParseError(cause: Throwable): Throwable = { new AnalysisException( errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", - messageParameters = Map("descFilePath" -> descFilePath), + messageParameters = Map.empty, cause = Option(cause)) } @@ -3377,13 +3377,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { cause = Option(cause)) } - def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = { - new AnalysisException( - errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", - messageParameters = Map("descFilePath" -> descFilePath), - cause = Option(cause)) - } - def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = { new AnalysisException( errorClass = "RECURSIVE_PROTOBUF_SCHEMA", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org