This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 07f71d2ba61 [SPARK-42690][CONNECT] Implement CSV/JSON parsing 
functions for Scala client
07f71d2ba61 is described below

commit 07f71d2ba61325331aabbc686ce30cb9012a6643
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Thu Mar 9 14:59:32 2023 +0800

    [SPARK-42690][CONNECT] Implement CSV/JSON parsing functions for Scala client
    
    ### What changes were proposed in this pull request?
    This pr add a new proto message
    
    ```
    message Parse {
      // (Required) Input relation to Parse. The input is expected to have 
single text column.
      Relation input = 1;
      // (Required) The expected format of the text.
      ParseFormat format = 2;
    
      // (Optional) DataType representing the schema. If not set, Spark will 
infer the schema.
      optional DataType schema = 3;
    
      // Options for the csv/json parser. The map key is case insensitive.
      map<string, string> options = 4;
      enum ParseFormat {
        PARSE_FORMAT_UNSPECIFIED = 0;
        PARSE_FORMAT_CSV = 1;
        PARSE_FORMAT_JSON = 2;
      }
    }
    ```
    
    and implement CSV/JSON parsing functions for Scala client.
    
    ### Why are the changes needed?
    Add Spark connect jvm client api coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass Github Actions
    - Manual checked Scala 2.13
    
    Closes #40332 from LuciferYang/SPARK-42690.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../org/apache/spark/sql/DataFrameReader.scala     |  52 +++++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  64 ++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  15 ++
 .../CheckConnectJvmClientCompatibility.scala       |   1 -
 .../main/protobuf/spark/connect/relations.proto    |  19 ++
 .../explain-results/csv_from_dataset.explain       |   1 +
 .../explain-results/json_from_dataset.explain      |   1 +
 .../query-tests/queries/csv_from_dataset.json      |  38 ++++
 .../query-tests/queries/csv_from_dataset.proto.bin | Bin 0 -> 156 bytes
 .../query-tests/queries/json_from_dataset.json     |  38 ++++
 .../queries/json_from_dataset.proto.bin            | Bin 0 -> 167 bytes
 .../sql/connect/planner/SparkConnectPlanner.scala  |  26 +++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 248 ++++++++++++---------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  97 ++++++++
 14 files changed, 491 insertions(+), 109 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index d5641fb303a..ad921bcc4e3 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -22,8 +22,10 @@ import java.util.Properties
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto.Parse.ParseFormat
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -324,6 +326,20 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     format("json").load(paths: _*)
   }
 
+  /**
+   * Loads a `Dataset[String]` storing JSON objects (<a 
href="http://jsonlines.org/";>JSON Lines
+   * text format or newline-delimited JSON</a>) and returns the result as a 
`DataFrame`.
+   *
+   * Unless the schema is specified using `schema` function, this function 
goes through the input
+   * once to determine the input schema.
+   *
+   * @param jsonDataset
+   *   input Dataset with one JSON object per record
+   * @since 3.4.0
+   */
+  def json(jsonDataset: Dataset[String]): DataFrame =
+    parse(jsonDataset, ParseFormat.PARSE_FORMAT_JSON)
+
   /**
    * Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the other
    * overloaded `csv()` method for more details.
@@ -351,6 +367,29 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
   @scala.annotation.varargs
   def csv(paths: String*): DataFrame = format("csv").load(paths: _*)
 
+  /**
+   * Loads an `Dataset[String]` storing CSV rows and returns the result as a 
`DataFrame`.
+   *
+   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
+   * this function goes through the input once to determine the input schema.
+   *
+   * If the schema is not specified using `schema` function and `inferSchema` 
option is disabled,
+   * it determines the columns as string types and it reads only the first 
line to determine the
+   * names and the number of fields.
+   *
+   * If the enforceSchema is set to `false`, only the CSV header in the first 
line is checked to
+   * conform specified or inferred schema.
+   *
+   * @note
+   *   if `header` option is set to `true` when calling this API, all lines 
same with the header
+   *   will be removed if exists.
+   * @param csvDataset
+   *   input Dataset with one CSV row per record
+   * @since 3.4.0
+   */
+  def csv(csvDataset: Dataset[String]): DataFrame =
+    parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
+
   /**
    * Loads a Parquet file, returning the result as a `DataFrame`. See the 
documentation on the
    * other overloaded `parquet()` method for more details.
@@ -504,6 +543,19 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
     }
   }
 
+  private def parse(ds: Dataset[String], format: ParseFormat): DataFrame = {
+    sparkSession.newDataFrame { builder =>
+      val parseBuilder = builder.getParseBuilder
+        .setInput(ds.plan.getRoot)
+        .setFormat(format)
+      userSpecifiedSchema.foreach(schema =>
+        
parseBuilder.setSchema(DataTypeProtoConverter.toConnectProtoType(schema)))
+      extraOptions.foreach { case (k, v) =>
+        parseBuilder.putOptions(k, v)
+      }
+    }
+  }
+
   
///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   
///////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 780280144b5..466a51841d4 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -27,6 +27,9 @@ import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
 
 import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, 
RemoteSparkSession}
 import org.apache.spark.sql.functions.{aggregate, array, broadcast, col, 
count, lit, rand, sequence, shuffle, struct, transform, udf}
 import org.apache.spark.sql.types._
@@ -644,6 +647,67 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
       .collect()
     assert(result sameElements expected)
   }
+
+  test("json from Dataset[String] inferSchema") {
+    val session = spark
+    import session.implicits._
+    val expected = Seq(
+      new GenericRowWithSchema(
+        Array(73, "Shandong", "Kong"),
+        new StructType().add("age", LongType).add("city", 
StringType).add("name", StringType)))
+    val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS()
+    val result = spark.read.option("allowSingleQuotes", "true").json(ds)
+    checkSameResult(expected, result)
+  }
+
+  test("json from Dataset[String] with schema") {
+    val session = spark
+    import session.implicits._
+    val schema = new StructType().add("city", StringType).add("name", 
StringType)
+    val expected = Seq(new GenericRowWithSchema(Array("Shandong", "Kong"), 
schema))
+    val ds = Seq("""{"name":"Kong","age":73,"city":'Shandong'}""").toDS()
+    val result = spark.read.schema(schema).option("allowSingleQuotes", 
"true").json(ds)
+    checkSameResult(expected, result)
+  }
+
+  test("json from Dataset[String] with invalid schema") {
+    val message = intercept[ParseException] {
+      
spark.read.schema("123").json(spark.createDataset(Seq.empty[String])(StringEncoder))
+    }.getMessage
+    assert(message.contains("PARSE_SYNTAX_ERROR"))
+  }
+
+  test("csv from Dataset[String] inferSchema") {
+    val session = spark
+    import session.implicits._
+    val expected = Seq(
+      new GenericRowWithSchema(
+        Array("Meng", 84, "Shandong"),
+        new StructType().add("name", StringType).add("age", 
LongType).add("city", StringType)))
+    val ds = Seq("name,age,city", """"Meng",84,"Shandong"""").toDS()
+    val result = spark.read
+      .option("header", "true")
+      .option("inferSchema", "true")
+      .csv(ds)
+    checkSameResult(expected, result)
+  }
+
+  test("csv from Dataset[String] with schema") {
+    val session = spark
+    import session.implicits._
+    val schema = new StructType().add("name", StringType).add("age", LongType)
+    val expected = Seq(new GenericRowWithSchema(Array("Meng", 84), schema))
+    val ds = Seq(""""Meng",84,"Shandong"""").toDS()
+    val result = spark.read.schema(schema).csv(ds)
+    checkSameResult(expected, result)
+  }
+
+  test("csv from Dataset[String] with invalid schema") {
+    val message = intercept[ParseException] {
+      
spark.read.schema("123").csv(spark.createDataset(Seq.empty[String])(StringEncoder))
+    }.getMessage
+    assert(message.contains("PARSE_SYNTAX_ERROR"))
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 56c5111912a..0d295d17296 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{functions => fn}
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.connect.client.SparkConnectClient
 import org.apache.spark.sql.connect.client.util.ConnectFunSuite
 import org.apache.spark.sql.expressions.Window
@@ -254,6 +255,13 @@ class PlanGenerationTestSuite
     session.read.json(testDataPath.resolve("people.json").toString)
   }
 
+  test("json from dataset") {
+    session.read
+      .schema(new StructType().add("c1", StringType).add("c2", IntegerType))
+      .option("allowSingleQuotes", "true")
+      .json(session.emptyDataset(StringEncoder))
+  }
+
   test("toJSON") {
     complex.toJSON
   }
@@ -262,6 +270,13 @@ class PlanGenerationTestSuite
     session.read.csv(testDataPath.resolve("people.csv").toString)
   }
 
+  test("csv from dataset") {
+    session.read
+      .schema(new StructType().add("c1", StringType).add("c2", IntegerType))
+      .option("header", "true")
+      .csv(session.emptyDataset(StringEncoder))
+  }
+
   test("read parquet") {
     session.read.parquet(testDataPath.resolve("users.parquet").toString)
   }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 868e7ae7b74..ae6c6c86fec 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -131,7 +131,6 @@ object CheckConnectJvmClientCompatibility {
 
       // DataFrame Reader & Writer
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index ab67ade9fb7..97fc3a474f3 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -62,6 +62,7 @@ message Relation {
     RepartitionByExpression repartition_by_expression = 27;
     FrameMap frame_map = 28;
     CollectMetrics collect_metrics = 29;
+    Parse parse = 30;
 
     // NA functions
     NAFill fill_na = 90;
@@ -798,3 +799,21 @@ message CollectMetrics {
   // (Required) The metric sequence.
   repeated Expression metrics = 3;
 }
+
+message Parse {
+  // (Required) Input relation to Parse. The input is expected to have single 
text column.
+  Relation input = 1;
+  // (Required) The expected format of the text.
+  ParseFormat format = 2;
+
+  // (Optional) DataType representing the schema. If not set, Spark will infer 
the schema.
+  optional DataType schema = 3;
+
+  // Options for the csv/json parser. The map key is case insensitive.
+  map<string, string> options = 4;
+  enum ParseFormat {
+    PARSE_FORMAT_UNSPECIFIED = 0;
+    PARSE_FORMAT_CSV = 1;
+    PARSE_FORMAT_JSON = 2;
+  }
+}
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
new file mode 100644
index 00000000000..9fbaa9fcede
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/csv_from_dataset.explain
@@ -0,0 +1 @@
+LogicalRDD [c1#0, c2#0], false
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
new file mode 100644
index 00000000000..9fbaa9fcede
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/json_from_dataset.explain
@@ -0,0 +1 @@
+LogicalRDD [c1#0, c2#0], false
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
new file mode 100644
index 00000000000..d34fcb6f758
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.json
@@ -0,0 +1,38 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "parse": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": 
"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
+      }
+    },
+    "format": "PARSE_FORMAT_CSV",
+    "schema": {
+      "struct": {
+        "fields": [{
+          "name": "c1",
+          "dataType": {
+            "string": {
+            }
+          },
+          "nullable": true
+        }, {
+          "name": "c2",
+          "dataType": {
+            "integer": {
+            }
+          },
+          "nullable": true
+        }]
+      }
+    },
+    "options": {
+      "header": "true"
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
new file mode 100644
index 00000000000..5f8bd50685c
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/csv_from_dataset.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
new file mode 100644
index 00000000000..d6f992d09a5
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.json
@@ -0,0 +1,38 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "parse": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": 
"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
+      }
+    },
+    "format": "PARSE_FORMAT_JSON",
+    "schema": {
+      "struct": {
+        "fields": [{
+          "name": "c1",
+          "dataType": {
+            "string": {
+            }
+          },
+          "nullable": true
+        }, {
+          "name": "c2",
+          "dataType": {
+            "integer": {
+            }
+          },
+          "nullable": true
+        }]
+      }
+    },
+    "options": {
+      "allowsinglequotes": "true"
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
new file mode 100644
index 00000000000..0fce9d9ff8c
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/json_from_dataset.proto.bin
 differ
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b51dbfa6602..9a8402a1e98 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -29,6 +29,7 @@ import org.apache.spark.api.python.{PythonEvalType, 
SimplePythonFunction}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{ExecutePlanResponse, SqlCommand}
 import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
+import org.apache.spark.connect.proto.Parse.ParseFormat
 import org.apache.spark.sql.{Column, Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, 
FunctionIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, 
MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedExtractValue, 
UnresolvedFunction, UnresolvedRegex, UnresolvedRelation, UnresolvedStar}
@@ -117,6 +118,7 @@ class SparkConnectPlanner(val session: SparkSession) {
         transformFrameMap(rel.getFrameMap)
       case proto.Relation.RelTypeCase.COLLECT_METRICS =>
         transformCollectMetrics(rel.getCollectMetrics)
+      case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
       case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
         throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
 
@@ -733,6 +735,30 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  private def transformParse(rel: proto.Parse): LogicalPlan = {
+    def dataFrameReader = {
+      val localMap = 
CaseInsensitiveMap[String](rel.getOptionsMap.asScala.toMap)
+      val reader = session.read
+      if (rel.hasSchema) {
+        DataTypeProtoConverter.toCatalystType(rel.getSchema) match {
+          case s: StructType => reader.schema(s)
+          case other => throw InvalidPlanInput(s"Invalid schema dataType 
$other")
+        }
+      }
+      localMap.foreach { case (key, value) => reader.option(key, value) }
+      reader
+    }
+    def ds: Dataset[String] = Dataset(session, 
transformRelation(rel.getInput))(Encoders.STRING)
+
+    rel.getFormat match {
+      case ParseFormat.PARSE_FORMAT_CSV =>
+        dataFrameReader.csv(ds).queryExecution.analyzed
+      case ParseFormat.PARSE_FORMAT_JSON =>
+        dataFrameReader.json(ds).queryExecution.analyzed
+      case _ => throw InvalidPlanInput("Does not support " + 
rel.getFormat.name())
+    }
+  }
+
   private def transformFilter(rel: proto.Filter): LogicalPlan = {
     assert(rel.hasInput)
     val baseRel = transformRelation(rel.getInput)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index e577749c3ed..81fa3916c5a 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catal
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xfb\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xa9\x13\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 
@@ -93,9 +93,12 @@ _TOSCHEMA = DESCRIPTOR.message_types_by_name["ToSchema"]
 _REPARTITIONBYEXPRESSION = 
DESCRIPTOR.message_types_by_name["RepartitionByExpression"]
 _FRAMEMAP = DESCRIPTOR.message_types_by_name["FrameMap"]
 _COLLECTMETRICS = DESCRIPTOR.message_types_by_name["CollectMetrics"]
+_PARSE = DESCRIPTOR.message_types_by_name["Parse"]
+_PARSE_OPTIONSENTRY = _PARSE.nested_types_by_name["OptionsEntry"]
 _JOIN_JOINTYPE = _JOIN.enum_types_by_name["JoinType"]
 _SETOPERATION_SETOPTYPE = _SETOPERATION.enum_types_by_name["SetOpType"]
 _AGGREGATE_GROUPTYPE = _AGGREGATE.enum_types_by_name["GroupType"]
+_PARSE_PARSEFORMAT = _PARSE.enum_types_by_name["ParseFormat"]
 Relation = _reflection.GeneratedProtocolMessageType(
     "Relation",
     (_message.Message,),
@@ -648,6 +651,27 @@ CollectMetrics = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(CollectMetrics)
 
+Parse = _reflection.GeneratedProtocolMessageType(
+    "Parse",
+    (_message.Message,),
+    {
+        "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+            "OptionsEntry",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _PARSE_OPTIONSENTRY,
+                "__module__": "spark.connect.relations_pb2"
+                # 
@@protoc_insertion_point(class_scope:spark.connect.Parse.OptionsEntry)
+            },
+        ),
+        "DESCRIPTOR": _PARSE,
+        "__module__": "spark.connect.relations_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.Parse)
+    },
+)
+_sym_db.RegisterMessage(Parse)
+_sym_db.RegisterMessage(Parse.OptionsEntry)
+
 if _descriptor._USE_C_DESCRIPTORS == False:
 
     DESCRIPTOR._options = None
@@ -658,112 +682,120 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
     _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._options = None
     _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_options = b"8\001"
+    _PARSE_OPTIONSENTRY._options = None
+    _PARSE_OPTIONSENTRY._serialized_options = b"8\001"
     _RELATION._serialized_start = 165
-    _RELATION._serialized_end = 2592
-    _UNKNOWN._serialized_start = 2594
-    _UNKNOWN._serialized_end = 2603
-    _RELATIONCOMMON._serialized_start = 2605
-    _RELATIONCOMMON._serialized_end = 2696
-    _SQL._serialized_start = 2699
-    _SQL._serialized_end = 2833
-    _SQL_ARGSENTRY._serialized_start = 2778
-    _SQL_ARGSENTRY._serialized_end = 2833
-    _READ._serialized_start = 2836
-    _READ._serialized_end = 3332
-    _READ_NAMEDTABLE._serialized_start = 2978
-    _READ_NAMEDTABLE._serialized_end = 3039
-    _READ_DATASOURCE._serialized_start = 3042
-    _READ_DATASOURCE._serialized_end = 3319
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3239
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3297
-    _PROJECT._serialized_start = 3334
-    _PROJECT._serialized_end = 3451
-    _FILTER._serialized_start = 3453
-    _FILTER._serialized_end = 3565
-    _JOIN._serialized_start = 3568
-    _JOIN._serialized_end = 4039
-    _JOIN_JOINTYPE._serialized_start = 3831
-    _JOIN_JOINTYPE._serialized_end = 4039
-    _SETOPERATION._serialized_start = 4042
-    _SETOPERATION._serialized_end = 4521
-    _SETOPERATION_SETOPTYPE._serialized_start = 4358
-    _SETOPERATION_SETOPTYPE._serialized_end = 4472
-    _LIMIT._serialized_start = 4523
-    _LIMIT._serialized_end = 4599
-    _OFFSET._serialized_start = 4601
-    _OFFSET._serialized_end = 4680
-    _TAIL._serialized_start = 4682
-    _TAIL._serialized_end = 4757
-    _AGGREGATE._serialized_start = 4760
-    _AGGREGATE._serialized_end = 5342
-    _AGGREGATE_PIVOT._serialized_start = 5099
-    _AGGREGATE_PIVOT._serialized_end = 5210
-    _AGGREGATE_GROUPTYPE._serialized_start = 5213
-    _AGGREGATE_GROUPTYPE._serialized_end = 5342
-    _SORT._serialized_start = 5345
-    _SORT._serialized_end = 5505
-    _DROP._serialized_start = 5508
-    _DROP._serialized_end = 5649
-    _DEDUPLICATE._serialized_start = 5652
-    _DEDUPLICATE._serialized_end = 5823
-    _LOCALRELATION._serialized_start = 5825
-    _LOCALRELATION._serialized_end = 5914
-    _SAMPLE._serialized_start = 5917
-    _SAMPLE._serialized_end = 6190
-    _RANGE._serialized_start = 6193
-    _RANGE._serialized_end = 6338
-    _SUBQUERYALIAS._serialized_start = 6340
-    _SUBQUERYALIAS._serialized_end = 6454
-    _REPARTITION._serialized_start = 6457
-    _REPARTITION._serialized_end = 6599
-    _SHOWSTRING._serialized_start = 6602
-    _SHOWSTRING._serialized_end = 6744
-    _STATSUMMARY._serialized_start = 6746
-    _STATSUMMARY._serialized_end = 6838
-    _STATDESCRIBE._serialized_start = 6840
-    _STATDESCRIBE._serialized_end = 6921
-    _STATCROSSTAB._serialized_start = 6923
-    _STATCROSSTAB._serialized_end = 7024
-    _STATCOV._serialized_start = 7026
-    _STATCOV._serialized_end = 7122
-    _STATCORR._serialized_start = 7125
-    _STATCORR._serialized_end = 7262
-    _STATAPPROXQUANTILE._serialized_start = 7265
-    _STATAPPROXQUANTILE._serialized_end = 7429
-    _STATFREQITEMS._serialized_start = 7431
-    _STATFREQITEMS._serialized_end = 7556
-    _STATSAMPLEBY._serialized_start = 7559
-    _STATSAMPLEBY._serialized_end = 7868
-    _STATSAMPLEBY_FRACTION._serialized_start = 7760
-    _STATSAMPLEBY_FRACTION._serialized_end = 7859
-    _NAFILL._serialized_start = 7871
-    _NAFILL._serialized_end = 8005
-    _NADROP._serialized_start = 8008
-    _NADROP._serialized_end = 8142
-    _NAREPLACE._serialized_start = 8145
-    _NAREPLACE._serialized_end = 8441
-    _NAREPLACE_REPLACEMENT._serialized_start = 8300
-    _NAREPLACE_REPLACEMENT._serialized_end = 8441
-    _TODF._serialized_start = 8443
-    _TODF._serialized_end = 8531
-    _WITHCOLUMNSRENAMED._serialized_start = 8534
-    _WITHCOLUMNSRENAMED._serialized_end = 8773
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8706
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8773
-    _WITHCOLUMNS._serialized_start = 8775
-    _WITHCOLUMNS._serialized_end = 8894
-    _HINT._serialized_start = 8897
-    _HINT._serialized_end = 9029
-    _UNPIVOT._serialized_start = 9032
-    _UNPIVOT._serialized_end = 9359
-    _UNPIVOT_VALUES._serialized_start = 9289
-    _UNPIVOT_VALUES._serialized_end = 9348
-    _TOSCHEMA._serialized_start = 9361
-    _TOSCHEMA._serialized_end = 9467
-    _REPARTITIONBYEXPRESSION._serialized_start = 9470
-    _REPARTITIONBYEXPRESSION._serialized_end = 9673
-    _FRAMEMAP._serialized_start = 9675
-    _FRAMEMAP._serialized_end = 9800
-    _COLLECTMETRICS._serialized_start = 9803
-    _COLLECTMETRICS._serialized_end = 9939
+    _RELATION._serialized_end = 2638
+    _UNKNOWN._serialized_start = 2640
+    _UNKNOWN._serialized_end = 2649
+    _RELATIONCOMMON._serialized_start = 2651
+    _RELATIONCOMMON._serialized_end = 2742
+    _SQL._serialized_start = 2745
+    _SQL._serialized_end = 2879
+    _SQL_ARGSENTRY._serialized_start = 2824
+    _SQL_ARGSENTRY._serialized_end = 2879
+    _READ._serialized_start = 2882
+    _READ._serialized_end = 3378
+    _READ_NAMEDTABLE._serialized_start = 3024
+    _READ_NAMEDTABLE._serialized_end = 3085
+    _READ_DATASOURCE._serialized_start = 3088
+    _READ_DATASOURCE._serialized_end = 3365
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3285
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3343
+    _PROJECT._serialized_start = 3380
+    _PROJECT._serialized_end = 3497
+    _FILTER._serialized_start = 3499
+    _FILTER._serialized_end = 3611
+    _JOIN._serialized_start = 3614
+    _JOIN._serialized_end = 4085
+    _JOIN_JOINTYPE._serialized_start = 3877
+    _JOIN_JOINTYPE._serialized_end = 4085
+    _SETOPERATION._serialized_start = 4088
+    _SETOPERATION._serialized_end = 4567
+    _SETOPERATION_SETOPTYPE._serialized_start = 4404
+    _SETOPERATION_SETOPTYPE._serialized_end = 4518
+    _LIMIT._serialized_start = 4569
+    _LIMIT._serialized_end = 4645
+    _OFFSET._serialized_start = 4647
+    _OFFSET._serialized_end = 4726
+    _TAIL._serialized_start = 4728
+    _TAIL._serialized_end = 4803
+    _AGGREGATE._serialized_start = 4806
+    _AGGREGATE._serialized_end = 5388
+    _AGGREGATE_PIVOT._serialized_start = 5145
+    _AGGREGATE_PIVOT._serialized_end = 5256
+    _AGGREGATE_GROUPTYPE._serialized_start = 5259
+    _AGGREGATE_GROUPTYPE._serialized_end = 5388
+    _SORT._serialized_start = 5391
+    _SORT._serialized_end = 5551
+    _DROP._serialized_start = 5554
+    _DROP._serialized_end = 5695
+    _DEDUPLICATE._serialized_start = 5698
+    _DEDUPLICATE._serialized_end = 5869
+    _LOCALRELATION._serialized_start = 5871
+    _LOCALRELATION._serialized_end = 5960
+    _SAMPLE._serialized_start = 5963
+    _SAMPLE._serialized_end = 6236
+    _RANGE._serialized_start = 6239
+    _RANGE._serialized_end = 6384
+    _SUBQUERYALIAS._serialized_start = 6386
+    _SUBQUERYALIAS._serialized_end = 6500
+    _REPARTITION._serialized_start = 6503
+    _REPARTITION._serialized_end = 6645
+    _SHOWSTRING._serialized_start = 6648
+    _SHOWSTRING._serialized_end = 6790
+    _STATSUMMARY._serialized_start = 6792
+    _STATSUMMARY._serialized_end = 6884
+    _STATDESCRIBE._serialized_start = 6886
+    _STATDESCRIBE._serialized_end = 6967
+    _STATCROSSTAB._serialized_start = 6969
+    _STATCROSSTAB._serialized_end = 7070
+    _STATCOV._serialized_start = 7072
+    _STATCOV._serialized_end = 7168
+    _STATCORR._serialized_start = 7171
+    _STATCORR._serialized_end = 7308
+    _STATAPPROXQUANTILE._serialized_start = 7311
+    _STATAPPROXQUANTILE._serialized_end = 7475
+    _STATFREQITEMS._serialized_start = 7477
+    _STATFREQITEMS._serialized_end = 7602
+    _STATSAMPLEBY._serialized_start = 7605
+    _STATSAMPLEBY._serialized_end = 7914
+    _STATSAMPLEBY_FRACTION._serialized_start = 7806
+    _STATSAMPLEBY_FRACTION._serialized_end = 7905
+    _NAFILL._serialized_start = 7917
+    _NAFILL._serialized_end = 8051
+    _NADROP._serialized_start = 8054
+    _NADROP._serialized_end = 8188
+    _NAREPLACE._serialized_start = 8191
+    _NAREPLACE._serialized_end = 8487
+    _NAREPLACE_REPLACEMENT._serialized_start = 8346
+    _NAREPLACE_REPLACEMENT._serialized_end = 8487
+    _TODF._serialized_start = 8489
+    _TODF._serialized_end = 8577
+    _WITHCOLUMNSRENAMED._serialized_start = 8580
+    _WITHCOLUMNSRENAMED._serialized_end = 8819
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8752
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8819
+    _WITHCOLUMNS._serialized_start = 8821
+    _WITHCOLUMNS._serialized_end = 8940
+    _HINT._serialized_start = 8943
+    _HINT._serialized_end = 9075
+    _UNPIVOT._serialized_start = 9078
+    _UNPIVOT._serialized_end = 9405
+    _UNPIVOT_VALUES._serialized_start = 9335
+    _UNPIVOT_VALUES._serialized_end = 9394
+    _TOSCHEMA._serialized_start = 9407
+    _TOSCHEMA._serialized_end = 9513
+    _REPARTITIONBYEXPRESSION._serialized_start = 9516
+    _REPARTITIONBYEXPRESSION._serialized_end = 9719
+    _FRAMEMAP._serialized_start = 9721
+    _FRAMEMAP._serialized_end = 9846
+    _COLLECTMETRICS._serialized_start = 9849
+    _COLLECTMETRICS._serialized_end = 9985
+    _PARSE._serialized_start = 9988
+    _PARSE._serialized_end = 10376
+    _PARSE_OPTIONSENTRY._serialized_start = 3285
+    _PARSE_OPTIONSENTRY._serialized_end = 3343
+    _PARSE_PARSEFORMAT._serialized_start = 10277
+    _PARSE_PARSEFORMAT._serialized_end = 10365
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index d434451082e..edaab7bcb77 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -91,6 +91,7 @@ class Relation(google.protobuf.message.Message):
     REPARTITION_BY_EXPRESSION_FIELD_NUMBER: builtins.int
     FRAME_MAP_FIELD_NUMBER: builtins.int
     COLLECT_METRICS_FIELD_NUMBER: builtins.int
+    PARSE_FIELD_NUMBER: builtins.int
     FILL_NA_FIELD_NUMBER: builtins.int
     DROP_NA_FIELD_NUMBER: builtins.int
     REPLACE_FIELD_NUMBER: builtins.int
@@ -164,6 +165,8 @@ class Relation(google.protobuf.message.Message):
     @property
     def collect_metrics(self) -> global___CollectMetrics: ...
     @property
+    def parse(self) -> global___Parse: ...
+    @property
     def fill_na(self) -> global___NAFill:
         """NA functions"""
     @property
@@ -229,6 +232,7 @@ class Relation(google.protobuf.message.Message):
         repartition_by_expression: global___RepartitionByExpression | None = 
...,
         frame_map: global___FrameMap | None = ...,
         collect_metrics: global___CollectMetrics | None = ...,
+        parse: global___Parse | None = ...,
         fill_na: global___NAFill | None = ...,
         drop_na: global___NADrop | None = ...,
         replace: global___NAReplace | None = ...,
@@ -291,6 +295,8 @@ class Relation(google.protobuf.message.Message):
             b"local_relation",
             "offset",
             b"offset",
+            "parse",
+            b"parse",
             "project",
             b"project",
             "range",
@@ -384,6 +390,8 @@ class Relation(google.protobuf.message.Message):
             b"local_relation",
             "offset",
             b"offset",
+            "parse",
+            b"parse",
             "project",
             b"project",
             "range",
@@ -461,6 +469,7 @@ class Relation(google.protobuf.message.Message):
         "repartition_by_expression",
         "frame_map",
         "collect_metrics",
+        "parse",
         "fill_na",
         "drop_na",
         "replace",
@@ -2763,3 +2772,91 @@ class CollectMetrics(google.protobuf.message.Message):
     ) -> None: ...
 
 global___CollectMetrics = CollectMetrics
+
+class Parse(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class _ParseFormat:
+        ValueType = typing.NewType("ValueType", builtins.int)
+        V: typing_extensions.TypeAlias = ValueType
+
+    class _ParseFormatEnumTypeWrapper(
+        
google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Parse._ParseFormat.ValueType],
+        builtins.type,
+    ):  # noqa: F821
+        DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+        PARSE_FORMAT_UNSPECIFIED: Parse._ParseFormat.ValueType  # 0
+        PARSE_FORMAT_CSV: Parse._ParseFormat.ValueType  # 1
+        PARSE_FORMAT_JSON: Parse._ParseFormat.ValueType  # 2
+
+    class ParseFormat(_ParseFormat, metaclass=_ParseFormatEnumTypeWrapper): ...
+    PARSE_FORMAT_UNSPECIFIED: Parse.ParseFormat.ValueType  # 0
+    PARSE_FORMAT_CSV: Parse.ParseFormat.ValueType  # 1
+    PARSE_FORMAT_JSON: Parse.ParseFormat.ValueType  # 2
+
+    class OptionsEntry(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        KEY_FIELD_NUMBER: builtins.int
+        VALUE_FIELD_NUMBER: builtins.int
+        key: builtins.str
+        value: builtins.str
+        def __init__(
+            self,
+            *,
+            key: builtins.str = ...,
+            value: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["key", b"key", 
"value", b"value"]
+        ) -> None: ...
+
+    INPUT_FIELD_NUMBER: builtins.int
+    FORMAT_FIELD_NUMBER: builtins.int
+    SCHEMA_FIELD_NUMBER: builtins.int
+    OPTIONS_FIELD_NUMBER: builtins.int
+    @property
+    def input(self) -> global___Relation:
+        """(Required) Input relation to Parse. The input is expected to have 
single text column."""
+    format: global___Parse.ParseFormat.ValueType
+    """(Required) The expected format of the text."""
+    @property
+    def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+        """(Optional) DataType representing the schema. If not set, Spark will 
infer the schema."""
+    @property
+    def options(self) -> 
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+        """Options for the csv/json parser. The map key is case insensitive."""
+    def __init__(
+        self,
+        *,
+        input: global___Relation | None = ...,
+        format: global___Parse.ParseFormat.ValueType = ...,
+        schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+        options: collections.abc.Mapping[builtins.str, builtins.str] | None = 
...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_schema", b"_schema", "input", b"input", "schema", b"schema"
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_schema",
+            b"_schema",
+            "format",
+            b"format",
+            "input",
+            b"input",
+            "options",
+            b"options",
+            "schema",
+            b"schema",
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+    ) -> typing_extensions.Literal["schema"] | None: ...
+
+global___Parse = Parse


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to