This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 07f71d2ba61 [SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client 07f71d2ba61 is described below commit 07f71d2ba61325331aabbc686ce30cb9012a6643 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Thu Mar 9 14:59:32 2023 +0800 [SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client ### What changes were proposed in this pull request? This pr add a new proto message ``` message Parse { // (Required) Input relation to Parse. The input is expected to have single text column. Relation input = 1; // (Required) The expected format of the text. ParseFormat format = 2; // (Optional) DataType representing the schema. If not set, Spark will infer the schema. optional DataType schema = 3; // Options for the csv/json parser. The map key is case insensitive. map<string, string> options = 4; enum ParseFormat { PARSE_FORMAT_UNSPECIFIED = 0; PARSE_FORMAT_CSV = 1; PARSE_FORMAT_JSON = 2; } } ``` and implement CSV/JSON parsing functions for Scala client. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass Github Actions - Manual checked Scala 2.13 Closes #40332 from LuciferYang/SPARK-42690. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../org/apache/spark/sql/DataFrameReader.scala | 52 +++++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 64 ++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 15 ++ .../CheckConnectJvmClientCompatibility.scala | 1 - .../main/protobuf/spark/connect/relations.proto | 19 ++ .../explain-results/csv_from_dataset.explain | 1 + .../explain-results/json_from_dataset.explain | 1 + .../query-tests/queries/csv_from_dataset.json | 38 ++++ .../query-tests/queries/csv_from_dataset.proto.bin | Bin 0 -> 156 bytes .../query-tests/queries/json_from_dataset.json | 38 ++++ .../queries/json_from_dataset.proto.bin | Bin 0 -> 167 bytes .../sql/connect/planner/SparkConnectPlanner.scala | 26 +++ python/pyspark/sql/connect/proto/relations_pb2.py | 248 ++++++++++++--------- python/pyspark/sql/connect/proto/relations_pb2.pyi | 97 ++++++++ 14 files changed, 491 insertions(+), 109 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index d5641fb303a..ad921bcc4e3 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -22,8 +22,10 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable +import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} +import org.apache.spark.sql.connect.common.DataTypeProtoConverter import org.apache.spark.sql.types.StructType /** @@ -324,6 +326,20 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging format("json").load(paths: _*) } + /** + * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines + * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`. + * + * Unless the schema is specified using `schema` function, this function goes through the input + * once to determine the input schema. + * + * @param jsonDataset + * input Dataset with one JSON object per record + * @since 3.4.0 + */ + def json(jsonDataset: Dataset[String]): DataFrame = + parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON) + /** * Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other * overloaded `csv()` method for more details. @@ -351,6 +367,29 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging @scala.annotation.varargs def csv(paths: String*): DataFrame = format("csv").load(paths: _*) + /** + * Loads an `Dataset[String]` storing CSV rows and returns the result as a `DataFrame`. + * + * If the schema is not specified using `schema` function and `inferSchema` option is enabled, + * this function goes through the input once to determine the input schema. + * + * If the schema is not specified using `schema` function and `inferSchema` option is disabled, + * it determines the columns as string types and it reads only the first line to determine the + * names and the number of fields. + * + * If the enforceSchema is set to `false`, only the CSV header in the first line is checked to + * conform specified or inferred schema. + * + * @note + * if `header` option is set to `true` when calling this API, all lines same with the header + * will be removed if exists. + * @param csvDataset + * input Dataset with one CSV row per record + * @since 3.4.0 + */ + def csv(csvDataset: Dataset[String]): DataFrame = + parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV) + /** * Loads a Parquet file, returning the result as a `DataFrame`. See the documentation on the * other overloaded `parquet()` method for more details. @@ -504,6 +543,19 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging } } + private def parse(ds: Dataset[String], format: ParseFormat): DataFrame = { + sparkSession.newDataFrame { builder => + val parseBuilder = builder.getParseBuilder + .setInput(ds.plan.getRoot) + .setFormat(format) + userSpecifiedSchema.foreach(schema => + parseBuilder.setSchema(DataTypeProtoConverter.toConnectProtoType(schema))) + extraOptions.foreach { case (k, v) => + parseBuilder.putOptions(k, v) + } + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 780280144b5..466a51841d4 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -27,6 +27,9 @@ import org.apache.commons.io.output.TeeOutputStream import org.scalactic.TolerantNumerics import org.apache.spark.SPARK_VERSION +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession} import org.apache.spark.sql.functions.{aggregate, array, broadcast, col, count, lit, rand, sequence, shuffle, struct, transform, udf} import org.apache.spark.sql.types._ @@ -644,6 +647,67 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { .collect() assert(result sameElements expected) } + + test("json from Dataset[String] inferSchema") { + val session = spark + import session.implicits._ + val expected = Seq( + new GenericRowWithSchema( + Array(73, "Shandong", "Kong"), + new StructType().add("age", LongType).add("city", StringType).add("name", StringType))) + val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS() + val result = spark.read.option("allowSingleQuotes", "true").json(ds) + checkSameResult(expected, result) + } + + test("json from Dataset[String] with schema") { + val session = spark + import session.implicits._ + val schema = new StructType().add("city", StringType).add("name", StringType) + val expected = Seq(new GenericRowWithSchema(Array("Shandong", "Kong"), schema)) + val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS() + val result = spark.read.schema(schema).option("allowSingleQuotes", "true").json(ds) + checkSameResult(expected, result) + } + + test("json from Dataset[String] with invalid schema") { + val message = intercept[ParseException] { + spark.read.schema("123").json(spark.createDataset(Seq.empty[String])(StringEncoder)) + }.getMessage + assert(message.contains("PARSE_SYNTAX_ERROR")) + } + + test("csv from Dataset[String] inferSchema") { + val session = spark + import session.implicits._ + val expected = Seq( + new GenericRowWithSchema( + Array("Meng", 84, "Shandong"), + new StructType().add("name", StringType).add("age", LongType).add("city", StringType))) + val ds = Seq("name,age,city", """"Meng",84,"Shandong"""").toDS() + val result = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(ds) + checkSameResult(expected, result) + } + + test("csv from Dataset[String] with schema") { + val session = spark + import session.implicits._ + val schema = new StructType().add("name", StringType).add("age", LongType) + val expected = Seq(new GenericRowWithSchema(Array("Meng", 84), schema)) + val ds = Seq(""""Meng",84,"Shandong"""").toDS() + val result = spark.read.schema(schema).csv(ds) + checkSameResult(expected, result) + } + + test("csv from Dataset[String] with invalid schema") { + val message = intercept[ParseException] { + spark.read.schema("123").csv(spark.createDataset(Seq.empty[String])(StringEncoder)) + }.getMessage + assert(message.contains("PARSE_SYNTAX_ERROR")) + } } private[sql] case class MyType(id: Long, a: Double, b: Double) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 56c5111912a..0d295d17296 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.{functions => fn} import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder import org.apache.spark.sql.connect.client.SparkConnectClient import org.apache.spark.sql.connect.client.util.ConnectFunSuite import org.apache.spark.sql.expressions.Window @@ -254,6 +255,13 @@ class PlanGenerationTestSuite session.read.json(testDataPath.resolve("people.json").toString) } + test("json from dataset") { + session.read + .schema(new StructType().add("c1", StringType).add("c2", IntegerType)) + .option("allowSingleQuotes", "true") + .json(session.emptyDataset(StringEncoder)) + } + test("toJSON") { complex.toJSON } @@ -262,6 +270,13 @@ class PlanGenerationTestSuite session.read.csv(testDataPath.resolve("people.csv").toString) } + test("csv from dataset") { + session.read + .schema(new StructType().add("c1", StringType).add("c2", IntegerType)) + .option("header", "true") + .csv(session.emptyDataset(StringEncoder)) + } + test("read parquet") { session.read.parquet(testDataPath.resolve("users.parquet").toString) } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 868e7ae7b74..ae6c6c86fec 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -131,7 +131,6 @@ object CheckConnectJvmClientCompatibility { // DataFrame Reader & Writer ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"), diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index ab67ade9fb7..97fc3a474f3 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -62,6 +62,7 @@ message Relation { RepartitionByExpression repartition_by_expression = 27; FrameMap frame_map = 28; CollectMetrics collect_metrics = 29; + Parse parse = 30; // NA functions NAFill fill_na = 90; @@ -798,3 +799,21 @@ message CollectMetrics { // (Required) The metric sequence. repeated Expression metrics = 3; } + +message Parse { + // (Required) Input relation to Parse. The input is expected to have single text column. + Relation input = 1; + // (Required) The expected format of the text. + ParseFormat format = 2; + + // (Optional) DataType representing the schema. If not set, Spark will infer the schema. + optional DataType schema = 3; + + // Options for the csv/json parser. The map key is case insensitive. + map<string, string> options = 4; + enum ParseFormat { + PARSE_FORMAT_UNSPECIFIED = 0; + PARSE_FORMAT_CSV = 1; + PARSE_FORMAT_JSON = 2; + } +} diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain new file mode 100644 index 00000000000..9fbaa9fcede --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain @@ -0,0 +1 @@ +LogicalRDD [c1#0, c2#0], false diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain new file mode 100644 index 00000000000..9fbaa9fcede --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain @@ -0,0 +1 @@ +LogicalRDD [c1#0, c2#0], false diff --git a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json new file mode 100644 index 00000000000..d34fcb6f758 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json @@ -0,0 +1,38 @@ +{ + "common": { + "planId": "1" + }, + "parse": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}" + } + }, + "format": "PARSE_FORMAT_CSV", + "schema": { + "struct": { + "fields": [{ + "name": "c1", + "dataType": { + "string": { + } + }, + "nullable": true + }, { + "name": "c2", + "dataType": { + "integer": { + } + }, + "nullable": true + }] + } + }, + "options": { + "header": "true" + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin new file mode 100644 index 00000000000..5f8bd50685c Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json new file mode 100644 index 00000000000..d6f992d09a5 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json @@ -0,0 +1,38 @@ +{ + "common": { + "planId": "1" + }, + "parse": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}" + } + }, + "format": "PARSE_FORMAT_JSON", + "schema": { + "struct": { + "fields": [{ + "name": "c1", + "dataType": { + "string": { + } + }, + "nullable": true + }, { + "name": "c2", + "dataType": { + "integer": { + } + }, + "nullable": true + }] + } + }, + "options": { + "allowsinglequotes": "true" + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin new file mode 100644 index 00000000000..0fce9d9ff8c Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin differ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b51dbfa6602..9a8402a1e98 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -29,6 +29,7 @@ import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{ExecutePlanResponse, SqlCommand} import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult +import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.sql.{Column, Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, FunctionIdentifier} import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue, UnresolvedFunction, UnresolvedRegex, UnresolvedRelation, UnresolvedStar} @@ -117,6 +118,7 @@ class SparkConnectPlanner(val session: SparkSession) { transformFrameMap(rel.getFrameMap) case proto.Relation.RelTypeCase.COLLECT_METRICS => transformCollectMetrics(rel.getCollectMetrics) + case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse) case proto.Relation.RelTypeCase.RELTYPE_NOT_SET => throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.") @@ -733,6 +735,30 @@ class SparkConnectPlanner(val session: SparkSession) { } } + private def transformParse(rel: proto.Parse): LogicalPlan = { + def dataFrameReader = { + val localMap = CaseInsensitiveMap[String](rel.getOptionsMap.asScala.toMap) + val reader = session.read + if (rel.hasSchema) { + DataTypeProtoConverter.toCatalystType(rel.getSchema) match { + case s: StructType => reader.schema(s) + case other => throw InvalidPlanInput(s"Invalid schema dataType $other") + } + } + localMap.foreach { case (key, value) => reader.option(key, value) } + reader + } + def ds: Dataset[String] = Dataset(session, transformRelation(rel.getInput))(Encoders.STRING) + + rel.getFormat match { + case ParseFormat.PARSE_FORMAT_CSV => + dataFrameReader.csv(ds).queryExecution.analyzed + case ParseFormat.PARSE_FORMAT_JSON => + dataFrameReader.json(ds).queryExecution.analyzed + case _ => throw InvalidPlanInput("Does not support " + rel.getFormat.name()) + } + } + private def transformFilter(rel: proto.Filter): LogicalPlan = { assert(rel.hasInput) val baseRel = transformRelation(rel.getInput) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py index e577749c3ed..81fa3916c5a 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.py +++ b/python/pyspark/sql/connect/proto/relations_pb2.py @@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as spark_dot_connect_dot_catal DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xfb\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...] + b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xa9\x13\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...] ) @@ -93,9 +93,12 @@ _TOSCHEMA = DESCRIPTOR.message_types_by_name["ToSchema"] _REPARTITIONBYEXPRESSION = DESCRIPTOR.message_types_by_name["RepartitionByExpression"] _FRAMEMAP = DESCRIPTOR.message_types_by_name["FrameMap"] _COLLECTMETRICS = DESCRIPTOR.message_types_by_name["CollectMetrics"] +_PARSE = DESCRIPTOR.message_types_by_name["Parse"] +_PARSE_OPTIONSENTRY = _PARSE.nested_types_by_name["OptionsEntry"] _JOIN_JOINTYPE = _JOIN.enum_types_by_name["JoinType"] _SETOPERATION_SETOPTYPE = _SETOPERATION.enum_types_by_name["SetOpType"] _AGGREGATE_GROUPTYPE = _AGGREGATE.enum_types_by_name["GroupType"] +_PARSE_PARSEFORMAT = _PARSE.enum_types_by_name["ParseFormat"] Relation = _reflection.GeneratedProtocolMessageType( "Relation", (_message.Message,), @@ -648,6 +651,27 @@ CollectMetrics = _reflection.GeneratedProtocolMessageType( ) _sym_db.RegisterMessage(CollectMetrics) +Parse = _reflection.GeneratedProtocolMessageType( + "Parse", + (_message.Message,), + { + "OptionsEntry": _reflection.GeneratedProtocolMessageType( + "OptionsEntry", + (_message.Message,), + { + "DESCRIPTOR": _PARSE_OPTIONSENTRY, + "__module__": "spark.connect.relations_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.Parse.OptionsEntry) + }, + ), + "DESCRIPTOR": _PARSE, + "__module__": "spark.connect.relations_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.Parse) + }, +) +_sym_db.RegisterMessage(Parse) +_sym_db.RegisterMessage(Parse.OptionsEntry) + if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None @@ -658,112 +682,120 @@ if _descriptor._USE_C_DESCRIPTORS == False: _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001" _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._options = None _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_options = b"8\001" + _PARSE_OPTIONSENTRY._options = None + _PARSE_OPTIONSENTRY._serialized_options = b"8\001" _RELATION._serialized_start = 165 - _RELATION._serialized_end = 2592 - _UNKNOWN._serialized_start = 2594 - _UNKNOWN._serialized_end = 2603 - _RELATIONCOMMON._serialized_start = 2605 - _RELATIONCOMMON._serialized_end = 2696 - _SQL._serialized_start = 2699 - _SQL._serialized_end = 2833 - _SQL_ARGSENTRY._serialized_start = 2778 - _SQL_ARGSENTRY._serialized_end = 2833 - _READ._serialized_start = 2836 - _READ._serialized_end = 3332 - _READ_NAMEDTABLE._serialized_start = 2978 - _READ_NAMEDTABLE._serialized_end = 3039 - _READ_DATASOURCE._serialized_start = 3042 - _READ_DATASOURCE._serialized_end = 3319 - _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3239 - _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3297 - _PROJECT._serialized_start = 3334 - _PROJECT._serialized_end = 3451 - _FILTER._serialized_start = 3453 - _FILTER._serialized_end = 3565 - _JOIN._serialized_start = 3568 - _JOIN._serialized_end = 4039 - _JOIN_JOINTYPE._serialized_start = 3831 - _JOIN_JOINTYPE._serialized_end = 4039 - _SETOPERATION._serialized_start = 4042 - _SETOPERATION._serialized_end = 4521 - _SETOPERATION_SETOPTYPE._serialized_start = 4358 - _SETOPERATION_SETOPTYPE._serialized_end = 4472 - _LIMIT._serialized_start = 4523 - _LIMIT._serialized_end = 4599 - _OFFSET._serialized_start = 4601 - _OFFSET._serialized_end = 4680 - _TAIL._serialized_start = 4682 - _TAIL._serialized_end = 4757 - _AGGREGATE._serialized_start = 4760 - _AGGREGATE._serialized_end = 5342 - _AGGREGATE_PIVOT._serialized_start = 5099 - _AGGREGATE_PIVOT._serialized_end = 5210 - _AGGREGATE_GROUPTYPE._serialized_start = 5213 - _AGGREGATE_GROUPTYPE._serialized_end = 5342 - _SORT._serialized_start = 5345 - _SORT._serialized_end = 5505 - _DROP._serialized_start = 5508 - _DROP._serialized_end = 5649 - _DEDUPLICATE._serialized_start = 5652 - _DEDUPLICATE._serialized_end = 5823 - _LOCALRELATION._serialized_start = 5825 - _LOCALRELATION._serialized_end = 5914 - _SAMPLE._serialized_start = 5917 - _SAMPLE._serialized_end = 6190 - _RANGE._serialized_start = 6193 - _RANGE._serialized_end = 6338 - _SUBQUERYALIAS._serialized_start = 6340 - _SUBQUERYALIAS._serialized_end = 6454 - _REPARTITION._serialized_start = 6457 - _REPARTITION._serialized_end = 6599 - _SHOWSTRING._serialized_start = 6602 - _SHOWSTRING._serialized_end = 6744 - _STATSUMMARY._serialized_start = 6746 - _STATSUMMARY._serialized_end = 6838 - _STATDESCRIBE._serialized_start = 6840 - _STATDESCRIBE._serialized_end = 6921 - _STATCROSSTAB._serialized_start = 6923 - _STATCROSSTAB._serialized_end = 7024 - _STATCOV._serialized_start = 7026 - _STATCOV._serialized_end = 7122 - _STATCORR._serialized_start = 7125 - _STATCORR._serialized_end = 7262 - _STATAPPROXQUANTILE._serialized_start = 7265 - _STATAPPROXQUANTILE._serialized_end = 7429 - _STATFREQITEMS._serialized_start = 7431 - _STATFREQITEMS._serialized_end = 7556 - _STATSAMPLEBY._serialized_start = 7559 - _STATSAMPLEBY._serialized_end = 7868 - _STATSAMPLEBY_FRACTION._serialized_start = 7760 - _STATSAMPLEBY_FRACTION._serialized_end = 7859 - _NAFILL._serialized_start = 7871 - _NAFILL._serialized_end = 8005 - _NADROP._serialized_start = 8008 - _NADROP._serialized_end = 8142 - _NAREPLACE._serialized_start = 8145 - _NAREPLACE._serialized_end = 8441 - _NAREPLACE_REPLACEMENT._serialized_start = 8300 - _NAREPLACE_REPLACEMENT._serialized_end = 8441 - _TODF._serialized_start = 8443 - _TODF._serialized_end = 8531 - _WITHCOLUMNSRENAMED._serialized_start = 8534 - _WITHCOLUMNSRENAMED._serialized_end = 8773 - _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8706 - _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8773 - _WITHCOLUMNS._serialized_start = 8775 - _WITHCOLUMNS._serialized_end = 8894 - _HINT._serialized_start = 8897 - _HINT._serialized_end = 9029 - _UNPIVOT._serialized_start = 9032 - _UNPIVOT._serialized_end = 9359 - _UNPIVOT_VALUES._serialized_start = 9289 - _UNPIVOT_VALUES._serialized_end = 9348 - _TOSCHEMA._serialized_start = 9361 - _TOSCHEMA._serialized_end = 9467 - _REPARTITIONBYEXPRESSION._serialized_start = 9470 - _REPARTITIONBYEXPRESSION._serialized_end = 9673 - _FRAMEMAP._serialized_start = 9675 - _FRAMEMAP._serialized_end = 9800 - _COLLECTMETRICS._serialized_start = 9803 - _COLLECTMETRICS._serialized_end = 9939 + _RELATION._serialized_end = 2638 + _UNKNOWN._serialized_start = 2640 + _UNKNOWN._serialized_end = 2649 + _RELATIONCOMMON._serialized_start = 2651 + _RELATIONCOMMON._serialized_end = 2742 + _SQL._serialized_start = 2745 + _SQL._serialized_end = 2879 + _SQL_ARGSENTRY._serialized_start = 2824 + _SQL_ARGSENTRY._serialized_end = 2879 + _READ._serialized_start = 2882 + _READ._serialized_end = 3378 + _READ_NAMEDTABLE._serialized_start = 3024 + _READ_NAMEDTABLE._serialized_end = 3085 + _READ_DATASOURCE._serialized_start = 3088 + _READ_DATASOURCE._serialized_end = 3365 + _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3285 + _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3343 + _PROJECT._serialized_start = 3380 + _PROJECT._serialized_end = 3497 + _FILTER._serialized_start = 3499 + _FILTER._serialized_end = 3611 + _JOIN._serialized_start = 3614 + _JOIN._serialized_end = 4085 + _JOIN_JOINTYPE._serialized_start = 3877 + _JOIN_JOINTYPE._serialized_end = 4085 + _SETOPERATION._serialized_start = 4088 + _SETOPERATION._serialized_end = 4567 + _SETOPERATION_SETOPTYPE._serialized_start = 4404 + _SETOPERATION_SETOPTYPE._serialized_end = 4518 + _LIMIT._serialized_start = 4569 + _LIMIT._serialized_end = 4645 + _OFFSET._serialized_start = 4647 + _OFFSET._serialized_end = 4726 + _TAIL._serialized_start = 4728 + _TAIL._serialized_end = 4803 + _AGGREGATE._serialized_start = 4806 + _AGGREGATE._serialized_end = 5388 + _AGGREGATE_PIVOT._serialized_start = 5145 + _AGGREGATE_PIVOT._serialized_end = 5256 + _AGGREGATE_GROUPTYPE._serialized_start = 5259 + _AGGREGATE_GROUPTYPE._serialized_end = 5388 + _SORT._serialized_start = 5391 + _SORT._serialized_end = 5551 + _DROP._serialized_start = 5554 + _DROP._serialized_end = 5695 + _DEDUPLICATE._serialized_start = 5698 + _DEDUPLICATE._serialized_end = 5869 + _LOCALRELATION._serialized_start = 5871 + _LOCALRELATION._serialized_end = 5960 + _SAMPLE._serialized_start = 5963 + _SAMPLE._serialized_end = 6236 + _RANGE._serialized_start = 6239 + _RANGE._serialized_end = 6384 + _SUBQUERYALIAS._serialized_start = 6386 + _SUBQUERYALIAS._serialized_end = 6500 + _REPARTITION._serialized_start = 6503 + _REPARTITION._serialized_end = 6645 + _SHOWSTRING._serialized_start = 6648 + _SHOWSTRING._serialized_end = 6790 + _STATSUMMARY._serialized_start = 6792 + _STATSUMMARY._serialized_end = 6884 + _STATDESCRIBE._serialized_start = 6886 + _STATDESCRIBE._serialized_end = 6967 + _STATCROSSTAB._serialized_start = 6969 + _STATCROSSTAB._serialized_end = 7070 + _STATCOV._serialized_start = 7072 + _STATCOV._serialized_end = 7168 + _STATCORR._serialized_start = 7171 + _STATCORR._serialized_end = 7308 + _STATAPPROXQUANTILE._serialized_start = 7311 + _STATAPPROXQUANTILE._serialized_end = 7475 + _STATFREQITEMS._serialized_start = 7477 + _STATFREQITEMS._serialized_end = 7602 + _STATSAMPLEBY._serialized_start = 7605 + _STATSAMPLEBY._serialized_end = 7914 + _STATSAMPLEBY_FRACTION._serialized_start = 7806 + _STATSAMPLEBY_FRACTION._serialized_end = 7905 + _NAFILL._serialized_start = 7917 + _NAFILL._serialized_end = 8051 + _NADROP._serialized_start = 8054 + _NADROP._serialized_end = 8188 + _NAREPLACE._serialized_start = 8191 + _NAREPLACE._serialized_end = 8487 + _NAREPLACE_REPLACEMENT._serialized_start = 8346 + _NAREPLACE_REPLACEMENT._serialized_end = 8487 + _TODF._serialized_start = 8489 + _TODF._serialized_end = 8577 + _WITHCOLUMNSRENAMED._serialized_start = 8580 + _WITHCOLUMNSRENAMED._serialized_end = 8819 + _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8752 + _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8819 + _WITHCOLUMNS._serialized_start = 8821 + _WITHCOLUMNS._serialized_end = 8940 + _HINT._serialized_start = 8943 + _HINT._serialized_end = 9075 + _UNPIVOT._serialized_start = 9078 + _UNPIVOT._serialized_end = 9405 + _UNPIVOT_VALUES._serialized_start = 9335 + _UNPIVOT_VALUES._serialized_end = 9394 + _TOSCHEMA._serialized_start = 9407 + _TOSCHEMA._serialized_end = 9513 + _REPARTITIONBYEXPRESSION._serialized_start = 9516 + _REPARTITIONBYEXPRESSION._serialized_end = 9719 + _FRAMEMAP._serialized_start = 9721 + _FRAMEMAP._serialized_end = 9846 + _COLLECTMETRICS._serialized_start = 9849 + _COLLECTMETRICS._serialized_end = 9985 + _PARSE._serialized_start = 9988 + _PARSE._serialized_end = 10376 + _PARSE_OPTIONSENTRY._serialized_start = 3285 + _PARSE_OPTIONSENTRY._serialized_end = 3343 + _PARSE_PARSEFORMAT._serialized_start = 10277 + _PARSE_PARSEFORMAT._serialized_end = 10365 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi index d434451082e..edaab7bcb77 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.pyi +++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi @@ -91,6 +91,7 @@ class Relation(google.protobuf.message.Message): REPARTITION_BY_EXPRESSION_FIELD_NUMBER: builtins.int FRAME_MAP_FIELD_NUMBER: builtins.int COLLECT_METRICS_FIELD_NUMBER: builtins.int + PARSE_FIELD_NUMBER: builtins.int FILL_NA_FIELD_NUMBER: builtins.int DROP_NA_FIELD_NUMBER: builtins.int REPLACE_FIELD_NUMBER: builtins.int @@ -164,6 +165,8 @@ class Relation(google.protobuf.message.Message): @property def collect_metrics(self) -> global___CollectMetrics: ... @property + def parse(self) -> global___Parse: ... + @property def fill_na(self) -> global___NAFill: """NA functions""" @property @@ -229,6 +232,7 @@ class Relation(google.protobuf.message.Message): repartition_by_expression: global___RepartitionByExpression | None = ..., frame_map: global___FrameMap | None = ..., collect_metrics: global___CollectMetrics | None = ..., + parse: global___Parse | None = ..., fill_na: global___NAFill | None = ..., drop_na: global___NADrop | None = ..., replace: global___NAReplace | None = ..., @@ -291,6 +295,8 @@ class Relation(google.protobuf.message.Message): b"local_relation", "offset", b"offset", + "parse", + b"parse", "project", b"project", "range", @@ -384,6 +390,8 @@ class Relation(google.protobuf.message.Message): b"local_relation", "offset", b"offset", + "parse", + b"parse", "project", b"project", "range", @@ -461,6 +469,7 @@ class Relation(google.protobuf.message.Message): "repartition_by_expression", "frame_map", "collect_metrics", + "parse", "fill_na", "drop_na", "replace", @@ -2763,3 +2772,91 @@ class CollectMetrics(google.protobuf.message.Message): ) -> None: ... global___CollectMetrics = CollectMetrics + +class Parse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _ParseFormat: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _ParseFormatEnumTypeWrapper( + google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Parse._ParseFormat.ValueType], + builtins.type, + ): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + PARSE_FORMAT_UNSPECIFIED: Parse._ParseFormat.ValueType # 0 + PARSE_FORMAT_CSV: Parse._ParseFormat.ValueType # 1 + PARSE_FORMAT_JSON: Parse._ParseFormat.ValueType # 2 + + class ParseFormat(_ParseFormat, metaclass=_ParseFormatEnumTypeWrapper): ... + PARSE_FORMAT_UNSPECIFIED: Parse.ParseFormat.ValueType # 0 + PARSE_FORMAT_CSV: Parse.ParseFormat.ValueType # 1 + PARSE_FORMAT_JSON: Parse.ParseFormat.ValueType # 2 + + class OptionsEntry(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + KEY_FIELD_NUMBER: builtins.int + VALUE_FIELD_NUMBER: builtins.int + key: builtins.str + value: builtins.str + def __init__( + self, + *, + key: builtins.str = ..., + value: builtins.str = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["key", b"key", "value", b"value"] + ) -> None: ... + + INPUT_FIELD_NUMBER: builtins.int + FORMAT_FIELD_NUMBER: builtins.int + SCHEMA_FIELD_NUMBER: builtins.int + OPTIONS_FIELD_NUMBER: builtins.int + @property + def input(self) -> global___Relation: + """(Required) Input relation to Parse. The input is expected to have single text column.""" + format: global___Parse.ParseFormat.ValueType + """(Required) The expected format of the text.""" + @property + def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType: + """(Optional) DataType representing the schema. If not set, Spark will infer the schema.""" + @property + def options(self) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]: + """Options for the csv/json parser. The map key is case insensitive.""" + def __init__( + self, + *, + input: global___Relation | None = ..., + format: global___Parse.ParseFormat.ValueType = ..., + schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ..., + options: collections.abc.Mapping[builtins.str, builtins.str] | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_schema", b"_schema", "input", b"input", "schema", b"schema" + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_schema", + b"_schema", + "format", + b"format", + "input", + b"input", + "options", + b"options", + "schema", + b"schema", + ], + ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_schema", b"_schema"] + ) -> typing_extensions.Literal["schema"] | None: ... + +global___Parse = Parse --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org