This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 53f11b1087a [SPARK-42457][CONNECT] Adding SparkSession#read
53f11b1087a is described below

commit 53f11b1087a192af5efadc36af7413b0c21c87e1
Author: Zhen Li <zhenli...@users.noreply.github.com>
AuthorDate: Thu Feb 16 14:22:47 2023 -0400

    [SPARK-42457][CONNECT] Adding SparkSession#read
    
    ### What changes were proposed in this pull request?
    
    Add SparkSession Read API to read data into Spark via Scala Client:
    ```
    DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
    ```
    
    The following methods are skipped by the Scala Client on purpose:
    ```
    [info]   deprecated method 
json(org.apache.spark.api.java.JavaRDD)org.apache.spark.sql.Dataset in class 
org.apache.spark.sql.DataFrameReader does not have a correspondent in client 
version
    [info]   deprecated method 
json(org.apache.spark.rdd.RDD)org.apache.spark.sql.Dataset in class 
org.apache.spark.sql.DataFrameReader does not have a correspondent in client 
version
    [info]   method 
json(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class 
org.apache.spark.sql.DataFrameReader does not have a correspondent in client 
version
    [info]   method 
csv(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class 
org.apache.spark.sql.DataFrameReader does not have a correspondent in client 
version
    ```
    
    ### Why are the changes needed?
    To read data from csv etc. format.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    E2E, Golden tests.
    
    Closes #40025 from zhenlineo/session-read.
    
    Authored-by: Zhen Li <zhenli...@users.noreply.github.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 8d863e306cb105b715c8b9206a2bdd944dafa90b)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../org/apache/spark/sql/DataFrameReader.scala     | 409 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  29 ++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  66 +++-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  54 ++-
 .../sql/connect/client/CompatibilitySuite.scala    |   7 +-
 .../connect/client/util/IntegrationTestUtils.scala |   2 +-
 .../query-tests/explain-results/read.explain       |   1 +
 .../query-tests/explain-results/read_csv.explain   |   1 +
 .../query-tests/explain-results/read_json.explain  |   1 +
 .../query-tests/explain-results/read_orc.explain   |   1 +
 .../explain-results/read_parquet.explain           |   1 +
 .../query-tests/explain-results/read_path.explain  |   1 +
 .../query-tests/explain-results/read_table.explain |   1 +
 .../query-tests/explain-results/read_text.explain  |   1 +
 .../query-tests/explain-results/table.explain      |   1 +
 .../test/resources/query-tests/queries/read.json   |  13 +
 .../resources/query-tests/queries/read.proto.bin   |   4 +
 .../resources/query-tests/queries/read_csv.json    |   8 +
 .../query-tests/queries/read_csv.proto.bin         |   2 +
 .../resources/query-tests/queries/read_json.json   |   8 +
 .../query-tests/queries/read_json.proto.bin        |   2 +
 .../resources/query-tests/queries/read_orc.json    |   8 +
 .../query-tests/queries/read_orc.proto.bin         |   2 +
 .../query-tests/queries/read_parquet.json          |   8 +
 .../query-tests/queries/read_parquet.proto.bin     |   2 +
 .../resources/query-tests/queries/read_path.json   |  11 +
 .../query-tests/queries/read_path.proto.bin        |   3 +
 .../resources/query-tests/queries/read_table.json  |   7 +
 .../query-tests/queries/read_table.proto.bin       |   3 +
 .../resources/query-tests/queries/read_text.json   |   8 +
 .../query-tests/queries/read_text.proto.bin        |   2 +
 .../test/resources/query-tests/queries/table.json  |   7 +
 .../resources/query-tests/queries/table.proto.bin  |   3 +
 .../resources/query-tests/test-data/people.csv     |   3 +
 .../resources/query-tests/test-data/people.json    |   3 +
 .../resources/query-tests/test-data/people.txt     |   3 +
 .../test/resources/query-tests/test-data/users.orc | Bin 0 -> 547 bytes
 .../resources/query-tests/test-data/users.parquet  | Bin 0 -> 615 bytes
 38 files changed, 681 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
new file mode 100644
index 00000000000..5a486efee31
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface used to load a [[Dataset]] from external storage systems (e.g. 
file systems,
+ * key-value stores, etc). Use `SparkSession.read` to access this.
+ *
+ * @since 3.4.0
+ */
+@Stable
+class DataFrameReader private[sql] (sparkSession: SparkSession) extends 
Logging {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 3.4.0
+   */
+  def format(source: String): DataFrameReader = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+   * automatically from data. By specifying the schema here, the underlying 
data source can skip
+   * the schema inference step, and thus speed up data loading.
+   *
+   * @since 3.4.0
+   */
+  def schema(schema: StructType): DataFrameReader = {
+    if (schema != null) {
+      val replaced = 
CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
+      this.userSpecifiedSchema = Option(replaced)
+    }
+    this
+  }
+
+  /**
+   * Specifies the schema by using the input DDL-formatted string. Some data 
sources (e.g. JSON)
+   * can infer the input schema automatically from data. By specifying the 
schema here, the
+   * underlying data source can skip the schema inference step, and thus speed 
up data loading.
+   *
+   * {{{
+   *   spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def schema(schemaString: String): DataFrameReader = {
+    schema(StructType.fromDDL(schemaString))
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: String): DataFrameReader = {
+    this.extraOptions = this.extraOptions + (key -> value)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Boolean): DataFrameReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Long): DataFrameReader = option(key, 
value.toString)
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def option(key: String, value: Double): DataFrameReader = option(key, 
value.toString)
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataFrameReader 
= {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds input options for the underlying data source.
+   *
+   * All options are maintained in a case-insensitive way in terms of key 
names. If a new option
+   * has the same key case-insensitively, it will override the existing option.
+   *
+   * @since 3.4.0
+   */
+  def options(options: java.util.Map[String, String]): DataFrameReader = {
+    this.options(options.asScala)
+    this
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
+   * key-value stores).
+   *
+   * @since 3.4.0
+   */
+  def load(): DataFrame = {
+    load(Seq.empty: _*) // force invocation of `load(...varargs...)`
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data sources that require a path 
(e.g. data backed by a
+   * local or distributed file system).
+   *
+   * @since 3.4.0
+   */
+  def load(path: String): DataFrame = {
+    // force invocation of `load(...varargs...)`
+    load(Seq(path): _*)
+  }
+
+  /**
+   * Loads input in as a `DataFrame`, for data sources that support multiple 
paths. Only works if
+   * the source is a HadoopFsRelationProvider.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def load(paths: String*): DataFrame = {
+    sparkSession.newDataset { builder =>
+      val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder
+      assertSourceFormatSpecified()
+      dataSourceBuilder.setFormat(source)
+      userSpecifiedSchema.foreach(schema => 
dataSourceBuilder.setSchema(schema.toDDL))
+      extraOptions.foreach { case (k, v) =>
+        dataSourceBuilder.putOptions(k, v)
+      }
+      paths.foreach(path => dataSourceBuilder.addPaths(path))
+      builder.build()
+    }
+  }
+
+  /**
+   * Loads a JSON file and returns the results as a `DataFrame`.
+   *
+   * See the documentation on the overloaded `json()` method with varargs for 
more details.
+   *
+   * @since 3.4.0
+   */
+  def json(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    json(Seq(path): _*)
+  }
+
+  /**
+   * Loads JSON files and returns the results as a `DataFrame`.
+   *
+   * <a href="http://jsonlines.org/";>JSON Lines</a> (newline-delimited JSON) 
is supported by
+   * default. For JSON (one record per file), set the `multiLine` option to 
true.
+   *
+   * This function goes through the input once to determine the input schema. 
If you know the
+   * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
+   *
+   * You can find the JSON-specific options for reading JSON files in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def json(paths: String*): DataFrame = {
+    format("json").load(paths: _*)
+  }
+
+  /**
+   * Loads a CSV file and returns the result as a `DataFrame`. See the 
documentation on the other
+   * overloaded `csv()` method for more details.
+   *
+   * @since 3.4.0
+   */
+  def csv(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    csv(Seq(path): _*)
+  }
+
+  /**
+   * Loads CSV files and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input 
schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable 
`inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can find the CSV-specific options for reading CSV files in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def csv(paths: String*): DataFrame = format("csv").load(paths: _*)
+
+  /**
+   * Loads a Parquet file, returning the result as a `DataFrame`. See the 
documentation on the
+   * other overloaded `parquet()` method for more details.
+   *
+   * @since 3.4.0
+   */
+  def parquet(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    parquet(Seq(path): _*)
+  }
+
+  /**
+   * Loads a Parquet file, returning the result as a `DataFrame`.
+   *
+   * Parquet-specific option(s) for reading Parquet files can be found in <a 
href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def parquet(paths: String*): DataFrame = {
+    format("parquet").load(paths: _*)
+  }
+
+  /**
+   * Loads an ORC file and returns the result as a `DataFrame`.
+   *
+   * @param path
+   *   input path
+   * @since 3.4.0
+   */
+  def orc(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    orc(Seq(path): _*)
+  }
+
+  /**
+   * Loads ORC files and returns the result as a `DataFrame`.
+   *
+   * ORC-specific option(s) for reading ORC files can be found in <a href=
+   * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
+   * Source Option</a> in the version you use.
+   *
+   * @param paths
+   *   input paths
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orc(paths: String*): DataFrame = format("orc").load(paths: _*)
+
+  /**
+   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
+   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a
+   * view, the returned DataFrame is simply the query plan of the view, which 
can either be a
+   * batch or streaming query plan.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table or 
view. If a database is
+   *   specified, it identifies the table/view from the database. Otherwise, 
it first attempts to
+   *   find a temporary view with the given name and then match the table/view 
from the current
+   *   database. Note that, the global temporary view database is also valid 
here.
+   * @since 3.4.0
+   */
+  def table(tableName: String): DataFrame = {
+    sparkSession.newDataset { builder =>
+      
builder.getReadBuilder.getNamedTableBuilder.setUnparsedIdentifier(tableName)
+    }
+  }
+
+  /**
+   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
+   * "value", and followed by partitioned columns if there are any. See the 
documentation on the
+   * other overloaded `text()` method for more details.
+   *
+   * @since 3.4.0
+   */
+  def text(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    text(Seq(path): _*)
+  }
+
+  /**
+   * Loads text files and returns a `DataFrame` whose schema starts with a 
string column named
+   * "value", and followed by partitioned columns if there are any. The text 
files must be encoded
+   * as UTF-8.
+   *
+   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.text("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().text("/path/to/spark/README.md")
+   * }}}
+   *
+   * You can find the text-specific options for reading text files in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @param paths
+   *   input paths
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def text(paths: String*): DataFrame = format("text").load(paths: _*)
+
+  /**
+   * Loads text files and returns a [[Dataset]] of String. See the 
documentation on the other
+   * overloaded `textFile()` method for more details.
+   * @since 3.4.0
+   */
+  def textFile(path: String): Dataset[String] = {
+    // This method ensures that calls that explicit need single argument 
works, see SPARK-16009
+    textFile(Seq(path): _*)
+  }
+
+  /**
+   * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
+   * contains a single string column named "value". The text files must be 
encoded as UTF-8.
+   *
+   * If the directory structure of the text files contains partitioning 
information, those are
+   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
+   *
+   * By default, each line in the text files is a new row in the resulting 
DataFrame. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * You can set the text-specific options as specified in 
`DataFrameReader.text`.
+   *
+   * @param paths
+   *   input path
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def textFile(paths: String*): Dataset[String] = {
+    // scalastyle:off throwerror
+    // TODO: this method can be supported and should be included in the client 
API.
+    throw new NotImplementedError()
+    // scalastyle:on throwerror
+  }
+
+  private def assertSourceFormatSpecified(): Unit = {
+    if (source == null) {
+      throw new IllegalArgumentException("The source format must be 
specified.")
+    }
+  }
+
+  
///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  
///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = _
+
+  private var userSpecifiedSchema: Option[StructType] = None
+
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
+
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7d6597bf6d5..54871c99b56 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -62,6 +62,35 @@ class SparkSession(private val client: SparkConnectClient, 
private val cleaner:
     builder.setSql(proto.SQL.newBuilder().setQuery(query))
   }
 
+  /**
+   * Returns a [[DataFrameReader]] that can be used to read non-streaming data 
in as a
+   * `DataFrame`.
+   * {{{
+   *   sparkSession.read.parquet("/path/to/file.parquet")
+   *   sparkSession.read.schema(schema).json("/path/to/file.json")
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def read: DataFrameReader = new DataFrameReader(this)
+
+  /**
+   * Returns the specified table/view as a `DataFrame`. If it's a table, it 
must support batch
+   * reading and the returned DataFrame is the batch scan query plan of this 
table. If it's a
+   * view, the returned DataFrame is simply the query plan of the view, which 
can either be a
+   * batch or streaming query plan.
+   *
+   * @param tableName
+   *   is either a qualified or unqualified name that designates a table or 
view. If a database is
+   *   specified, it identifies the table/view from the database. Otherwise, 
it first attempts to
+   *   find a temporary view with the given name and then match the table/view 
from the current
+   *   database. Note that, the global temporary view database is also valid 
here.
+   * @since 3.4.0
+   */
+  def table(tableName: String): DataFrame = {
+    read.table(tableName)
+  }
+
   /**
    * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements in a
    * range from 0 to `end` (exclusive) with step value 1.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 60ac25ab7ba..058ba1a8efc 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -20,12 +20,13 @@ import java.io.{ByteArrayOutputStream, PrintStream}
 
 import scala.collection.JavaConverters._
 
+import io.grpc.StatusRuntimeException
 import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
 
-import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, 
RemoteSparkSession}
 import org.apache.spark.sql.functions.udf
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 
 class ClientE2ETestSuite extends RemoteSparkSession {
 
@@ -66,6 +67,67 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     }
   }
 
+  test("read") {
+    val testDataPath = java.nio.file.Paths
+      .get(
+        IntegrationTestUtils.sparkHome,
+        "connector",
+        "connect",
+        "common",
+        "src",
+        "test",
+        "resources",
+        "query-tests",
+        "test-data",
+        "people.csv")
+      .toAbsolutePath
+    val df = spark.read
+      .format("csv")
+      .option("path", testDataPath.toString)
+      .options(Map("header" -> "true", "delimiter" -> ";"))
+      .schema(
+        StructType(
+          StructField("name", StringType) ::
+            StructField("age", IntegerType) ::
+            StructField("job", StringType) :: Nil))
+      .load()
+    val array = df.collectResult().toArray
+    assert(array.length == 2)
+    assert(array(0).getString(0) == "Jorge")
+    assert(array(0).getInt(1) == 30)
+    assert(array(0).getString(2) == "Developer")
+  }
+
+  test("read path collision") {
+    val testDataPath = java.nio.file.Paths
+      .get(
+        IntegrationTestUtils.sparkHome,
+        "connector",
+        "connect",
+        "common",
+        "src",
+        "test",
+        "resources",
+        "query-tests",
+        "test-data",
+        "people.csv")
+      .toAbsolutePath
+    val df = spark.read
+      .format("csv")
+      .option("path", testDataPath.toString)
+      .options(Map("header" -> "true", "delimiter" -> ";"))
+      .schema(
+        StructType(
+          StructField("name", StringType) ::
+            StructField("age", IntegerType) ::
+            StructField("job", StringType) :: Nil))
+      .csv(testDataPath.toString)
+    // Failed because the path cannot be provided both via option and load 
method (csv).
+    assertThrows[StatusRuntimeException] {
+      df.collectResult().toArray
+    }
+  }
+
   // TODO test large result when we can create table or view
   // test("test spark large result")
   private def captureStdOut(block: => Unit): String = {
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 416a9b33886..f8bae27cc0e 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{functions => fn}
 import org.apache.spark.sql.connect.client.SparkConnectClient
-import org.apache.spark.sql.types.{MapType, MetadataBuilder, StringType, 
StructType}
+import org.apache.spark.sql.types._
 
 // scalastyle:off
 /**
@@ -82,6 +82,17 @@ class PlanGenerationTestSuite extends ConnectFunSuite with 
BeforeAndAfterAll wit
 
   protected val queryFilePath: Path = baseResourcePath.resolve("queries")
 
+  // A relative path to /connector/connect/server, used by 
`ProtoToParsedPlanTestSuite` to run
+  // with the datasource.
+  protected val testDataPath: Path = java.nio.file.Paths.get(
+    "../",
+    "common",
+    "src",
+    "test",
+    "resources",
+    "query-tests",
+    "test-data")
+
   private val printer = JsonFormat.printer()
 
   private var session: SparkSession = _
@@ -197,6 +208,47 @@ class PlanGenerationTestSuite extends ConnectFunSuite with 
BeforeAndAfterAll wit
     session.range(1, 10, 1, 2)
   }
 
+  test("read") {
+    session.read
+      .format("csv")
+      .schema(
+        StructType(
+          StructField("name", StringType) ::
+            StructField("age", IntegerType) ::
+            StructField("job", StringType) :: Nil))
+      .option("header", "true")
+      .options(Map("delimiter" -> ";"))
+      .load(testDataPath.resolve("people.csv").toString)
+  }
+
+  test("read json") {
+    session.read.json(testDataPath.resolve("people.json").toString)
+  }
+
+  test("read csv") {
+    session.read.csv(testDataPath.resolve("people.csv").toString)
+  }
+
+  test("read parquet") {
+    session.read.parquet(testDataPath.resolve("users.parquet").toString)
+  }
+
+  test("read orc") {
+    session.read.orc(testDataPath.resolve("users.orc").toString)
+  }
+
+  test("read table") {
+    session.read.table("myTable")
+  }
+
+  test("table") {
+    session.table("myTable")
+  }
+
+  test("read text") {
+    session.read.text(testDataPath.resolve("people.txt").toString)
+  }
+
   /* Dataset API */
   test("select") {
     simple.select(fn.col("id"))
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 867995efc2e..fa2cb18cda2 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -75,12 +75,16 @@ class CompatibilitySuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
       // TODO(SPARK-42175) Add the Dataset object definition
       // IncludeByName("org.apache.spark.sql.Dataset$"),
       IncludeByName("org.apache.spark.sql.DataFrame"),
+      IncludeByName("org.apache.spark.sql.DataFrameReader"),
       IncludeByName("org.apache.spark.sql.SparkSession"),
       IncludeByName("org.apache.spark.sql.SparkSession$")) ++ 
includeImplementedMethods(clientJar)
     val excludeRules = Seq(
       // Filter unsupported rules:
       // Two sql overloading methods are marked experimental in the API and 
skipped in the client.
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
+      // Deprecated json methods and RDD related methods are skipped in the 
client.
+      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
+      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"),
       // Skip all shaded dependencies in the client.
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
@@ -130,7 +134,8 @@ class CompatibilitySuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
       // TODO(SPARK-42175) Add all overloading methods. Temporarily mute 
compatibility check for \
       //  the Dataset methods, as too many overload methods are missing.
       // "org.apache.spark.sql.Dataset",
-      "org.apache.spark.sql.SparkSession")
+      "org.apache.spark.sql.SparkSession",
+      "org.apache.spark.sql.DataFrameReader")
 
     val clientClassLoader: URLClassLoader = new 
URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
     clsNames
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala
index f0ae4cad679..2725422c299 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala
@@ -25,7 +25,7 @@ object IntegrationTestUtils {
   // System properties used for testing and debugging
   private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client"
 
-  private[connect] lazy val sparkHome: String = {
+  private[sql] lazy val sparkHome: String = {
     if (!(sys.props.contains("spark.test.home") || 
sys.env.contains("SPARK_HOME"))) {
       fail("spark.test.home or SPARK_HOME is not set.")
     }
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain
new file mode 100644
index 00000000000..30e5ec4542c
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain
@@ -0,0 +1 @@
+Relation [none#0,none#1,none#2] csv
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain
new file mode 100644
index 00000000000..069e76a8154
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain
@@ -0,0 +1 @@
+Relation [none#0] csv
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain
new file mode 100644
index 00000000000..ead927c8f24
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain
@@ -0,0 +1 @@
+Relation [none#0L,none#1] json
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain
new file mode 100644
index 00000000000..9a8b1aab953
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain
@@ -0,0 +1 @@
+Relation [none#0,none#1,none#2] orc
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain
new file mode 100644
index 00000000000..9fd4f43aeee
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain
@@ -0,0 +1 @@
+Relation [none#0,none#1,none#2] parquet
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain
new file mode 100644
index 00000000000..70393afccb2
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain
@@ -0,0 +1 @@
+Relation [none#0,none#1] csv
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain
new file mode 100644
index 00000000000..0ec9812e3b9
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain
@@ -0,0 +1 @@
+'UnresolvedRelation [myTable], [], false
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain
new file mode 100644
index 00000000000..3e78575dac6
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain
@@ -0,0 +1 @@
+Relation [none#0] text
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain
new file mode 100644
index 00000000000..0ec9812e3b9
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain
@@ -0,0 +1 @@
+'UnresolvedRelation [myTable], [], false
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read.json 
b/connector/connect/common/src/test/resources/query-tests/queries/read.json
new file mode 100644
index 00000000000..f5ffb3c961b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/read.json
@@ -0,0 +1,13 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "csv",
+      "schema": "name STRING,age INT,job STRING",
+      "options": {
+        "header": "true",
+        "delimiter": ";"
+      },
+      "paths": 
["../common/src/test/resources/query-tests/test-data/people.csv"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin
new file mode 100644
index 00000000000..ede57af1130
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin
@@ -0,0 +1,4 @@
+��
+csvname STRING,age INT,job STRING
+headertrue
+       
delimiter;"=../common/src/test/resources/query-tests/test-data/people.csv
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json 
b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json
new file mode 100644
index 00000000000..6095d200f62
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json
@@ -0,0 +1,8 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "csv",
+      "paths": 
["../common/src/test/resources/query-tests/test-data/people.csv"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin
new file mode 100644
index 00000000000..9bbab0fe2af
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin
@@ -0,0 +1,2 @@
+FD
+csv"=../common/src/test/resources/query-tests/test-data/people.csv
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_json.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_json.json
new file mode 100644
index 00000000000..2e2f83d1191
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_json.json
@@ -0,0 +1,8 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "json",
+      "paths": 
["../common/src/test/resources/query-tests/test-data/people.json"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin
new file mode 100644
index 00000000000..22557aca38d
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin
@@ -0,0 +1,2 @@
+HF
+json">../common/src/test/resources/query-tests/test-data/people.json
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json 
b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json
new file mode 100644
index 00000000000..caa6c951d3e
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json
@@ -0,0 +1,8 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "orc",
+      "paths": ["../common/src/test/resources/query-tests/test-data/users.orc"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin
new file mode 100644
index 00000000000..95d07fd0a9b
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin
@@ -0,0 +1,2 @@
+EC
+orc"<../common/src/test/resources/query-tests/test-data/users.orc
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json
new file mode 100644
index 00000000000..05d799fd9cb
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json
@@ -0,0 +1,8 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "parquet",
+      "paths": 
["../common/src/test/resources/query-tests/test-data/users.parquet"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin
new file mode 100644
index 00000000000..5fc1954428f
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin
@@ -0,0 +1,2 @@
+MK
+parquet"@../common/src/test/resources/query-tests/test-data/users.parquet
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_path.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_path.json
new file mode 100644
index 00000000000..c3fc8132a3b
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_path.json
@@ -0,0 +1,11 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "csv",
+      "schema": "name STRING,age INT",
+      "options": {
+        "path": "../common/src/test/resources/query-tests/test-data/people.csv"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin
new file mode 100644
index 00000000000..01787253c42
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin
@@ -0,0 +1,3 @@
+ca
+csvname STRING,age INTE
+path=../common/src/test/resources/query-tests/test-data/people.csv
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_table.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_table.json
new file mode 100644
index 00000000000..634310c27ff
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_table.json
@@ -0,0 +1,7 @@
+{
+  "read": {
+    "namedTable": {
+      "unparsedIdentifier": "myTable"
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin
new file mode 100644
index 00000000000..f6ffaf988e7
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin
@@ -0,0 +1,3 @@
+
+       
+myTable
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_text.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_text.json
new file mode 100644
index 00000000000..8dc1e26a70e
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_text.json
@@ -0,0 +1,8 @@
+{
+  "read": {
+    "dataSource": {
+      "format": "text",
+      "paths": 
["../common/src/test/resources/query-tests/test-data/people.txt"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin
new file mode 100644
index 00000000000..97d167f6e2e
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin
@@ -0,0 +1,2 @@
+GE
+text"=../common/src/test/resources/query-tests/test-data/people.txt
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/table.json 
b/connector/connect/common/src/test/resources/query-tests/queries/table.json
new file mode 100644
index 00000000000..634310c27ff
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/table.json
@@ -0,0 +1,7 @@
+{
+  "read": {
+    "namedTable": {
+      "unparsedIdentifier": "myTable"
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin
new file mode 100644
index 00000000000..f6ffaf988e7
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin
@@ -0,0 +1,3 @@
+
+       
+myTable
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/test-data/people.csv 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.csv
new file mode 100644
index 00000000000..7fe5adba93d
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.csv
@@ -0,0 +1,3 @@
+name;age;job
+Jorge;30;Developer
+Bob;32;Developer
diff --git 
a/connector/connect/common/src/test/resources/query-tests/test-data/people.json 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.json
new file mode 100644
index 00000000000..50a859cbd7e
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.json
@@ -0,0 +1,3 @@
+{"name":"Michael"}
+{"name":"Andy", "age":30}
+{"name":"Justin", "age":19}
diff --git 
a/connector/connect/common/src/test/resources/query-tests/test-data/people.txt 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.txt
new file mode 100644
index 00000000000..3bcace4a44c
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/test-data/people.txt
@@ -0,0 +1,3 @@
+Michael, 29
+Andy, 30
+Justin, 19
diff --git 
a/connector/connect/common/src/test/resources/query-tests/test-data/users.orc 
b/connector/connect/common/src/test/resources/query-tests/test-data/users.orc
new file mode 100644
index 00000000000..12478a5d03c
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/test-data/users.orc 
differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet
 
b/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet
new file mode 100644
index 00000000000..aa527338c43
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet
 differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to