This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 53f11b1087a [SPARK-42457][CONNECT] Adding SparkSession#read 53f11b1087a is described below commit 53f11b1087a192af5efadc36af7413b0c21c87e1 Author: Zhen Li <zhenli...@users.noreply.github.com> AuthorDate: Thu Feb 16 14:22:47 2023 -0400 [SPARK-42457][CONNECT] Adding SparkSession#read ### What changes were proposed in this pull request? Add SparkSession Read API to read data into Spark via Scala Client: ``` DataFrameReader.format(…).option(“key”, “value”).schema(…).load() ``` The following methods are skipped by the Scala Client on purpose: ``` [info] deprecated method json(org.apache.spark.api.java.JavaRDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version [info] deprecated method json(org.apache.spark.rdd.RDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version [info] method json(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version [info] method csv(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version ``` ### Why are the changes needed? To read data from csv etc. format. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? E2E, Golden tests. Closes #40025 from zhenlineo/session-read. Authored-by: Zhen Li <zhenli...@users.noreply.github.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 8d863e306cb105b715c8b9206a2bdd944dafa90b) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../org/apache/spark/sql/DataFrameReader.scala | 409 +++++++++++++++++++++ .../scala/org/apache/spark/sql/SparkSession.scala | 29 ++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 66 +++- .../apache/spark/sql/PlanGenerationTestSuite.scala | 54 ++- .../sql/connect/client/CompatibilitySuite.scala | 7 +- .../connect/client/util/IntegrationTestUtils.scala | 2 +- .../query-tests/explain-results/read.explain | 1 + .../query-tests/explain-results/read_csv.explain | 1 + .../query-tests/explain-results/read_json.explain | 1 + .../query-tests/explain-results/read_orc.explain | 1 + .../explain-results/read_parquet.explain | 1 + .../query-tests/explain-results/read_path.explain | 1 + .../query-tests/explain-results/read_table.explain | 1 + .../query-tests/explain-results/read_text.explain | 1 + .../query-tests/explain-results/table.explain | 1 + .../test/resources/query-tests/queries/read.json | 13 + .../resources/query-tests/queries/read.proto.bin | 4 + .../resources/query-tests/queries/read_csv.json | 8 + .../query-tests/queries/read_csv.proto.bin | 2 + .../resources/query-tests/queries/read_json.json | 8 + .../query-tests/queries/read_json.proto.bin | 2 + .../resources/query-tests/queries/read_orc.json | 8 + .../query-tests/queries/read_orc.proto.bin | 2 + .../query-tests/queries/read_parquet.json | 8 + .../query-tests/queries/read_parquet.proto.bin | 2 + .../resources/query-tests/queries/read_path.json | 11 + .../query-tests/queries/read_path.proto.bin | 3 + .../resources/query-tests/queries/read_table.json | 7 + .../query-tests/queries/read_table.proto.bin | 3 + .../resources/query-tests/queries/read_text.json | 8 + .../query-tests/queries/read_text.proto.bin | 2 + .../test/resources/query-tests/queries/table.json | 7 + .../resources/query-tests/queries/table.proto.bin | 3 + .../resources/query-tests/test-data/people.csv | 3 + .../resources/query-tests/test-data/people.json | 3 + .../resources/query-tests/test-data/people.txt | 3 + .../test/resources/query-tests/test-data/users.orc | Bin 0 -> 547 bytes .../resources/query-tests/test-data/users.parquet | Bin 0 -> 615 bytes 38 files changed, 681 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala new file mode 100644 index 00000000000..5a486efee31 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Stable +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} +import org.apache.spark.sql.types.StructType + +/** + * Interface used to load a [[Dataset]] from external storage systems (e.g. file systems, + * key-value stores, etc). Use `SparkSession.read` to access this. + * + * @since 3.4.0 + */ +@Stable +class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging { + + /** + * Specifies the input data source format. + * + * @since 3.4.0 + */ + def format(source: String): DataFrameReader = { + this.source = source + this + } + + /** + * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + * automatically from data. By specifying the schema here, the underlying data source can skip + * the schema inference step, and thus speed up data loading. + * + * @since 3.4.0 + */ + def schema(schema: StructType): DataFrameReader = { + if (schema != null) { + val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType] + this.userSpecifiedSchema = Option(replaced) + } + this + } + + /** + * Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON) + * can infer the input schema automatically from data. By specifying the schema here, the + * underlying data source can skip the schema inference step, and thus speed up data loading. + * + * {{{ + * spark.read.schema("a INT, b STRING, c DOUBLE").csv("test.csv") + * }}} + * + * @since 3.4.0 + */ + def schema(schemaString: String): DataFrameReader = { + schema(StructType.fromDDL(schemaString)) + } + + /** + * Adds an input option for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def option(key: String, value: String): DataFrameReader = { + this.extraOptions = this.extraOptions + (key -> value) + this + } + + /** + * Adds an input option for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def option(key: String, value: Boolean): DataFrameReader = option(key, value.toString) + + /** + * Adds an input option for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def option(key: String, value: Long): DataFrameReader = option(key, value.toString) + + /** + * Adds an input option for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def option(key: String, value: Double): DataFrameReader = option(key, value.toString) + + /** + * (Scala-specific) Adds input options for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def options(options: scala.collection.Map[String, String]): DataFrameReader = { + this.extraOptions ++= options + this + } + + /** + * Adds input options for the underlying data source. + * + * All options are maintained in a case-insensitive way in terms of key names. If a new option + * has the same key case-insensitively, it will override the existing option. + * + * @since 3.4.0 + */ + def options(options: java.util.Map[String, String]): DataFrameReader = { + this.options(options.asScala) + this + } + + /** + * Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external + * key-value stores). + * + * @since 3.4.0 + */ + def load(): DataFrame = { + load(Seq.empty: _*) // force invocation of `load(...varargs...)` + } + + /** + * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by a + * local or distributed file system). + * + * @since 3.4.0 + */ + def load(path: String): DataFrame = { + // force invocation of `load(...varargs...)` + load(Seq(path): _*) + } + + /** + * Loads input in as a `DataFrame`, for data sources that support multiple paths. Only works if + * the source is a HadoopFsRelationProvider. + * + * @since 3.4.0 + */ + @scala.annotation.varargs + def load(paths: String*): DataFrame = { + sparkSession.newDataset { builder => + val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder + assertSourceFormatSpecified() + dataSourceBuilder.setFormat(source) + userSpecifiedSchema.foreach(schema => dataSourceBuilder.setSchema(schema.toDDL)) + extraOptions.foreach { case (k, v) => + dataSourceBuilder.putOptions(k, v) + } + paths.foreach(path => dataSourceBuilder.addPaths(path)) + builder.build() + } + } + + /** + * Loads a JSON file and returns the results as a `DataFrame`. + * + * See the documentation on the overloaded `json()` method with varargs for more details. + * + * @since 3.4.0 + */ + def json(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + json(Seq(path): _*) + } + + /** + * Loads JSON files and returns the results as a `DataFrame`. + * + * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by + * default. For JSON (one record per file), set the `multiLine` option to true. + * + * This function goes through the input once to determine the input schema. If you know the + * schema in advance, use the version that specifies the schema to avoid the extra scan. + * + * You can find the JSON-specific options for reading JSON files in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.4.0 + */ + @scala.annotation.varargs + def json(paths: String*): DataFrame = { + format("json").load(paths: _*) + } + + /** + * Loads a CSV file and returns the result as a `DataFrame`. See the documentation on the other + * overloaded `csv()` method for more details. + * + * @since 3.4.0 + */ + def csv(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + csv(Seq(path): _*) + } + + /** + * Loads CSV files and returns the result as a `DataFrame`. + * + * This function will go through the input once to determine the input schema if `inferSchema` + * is enabled. To avoid going through the entire data once, disable `inferSchema` option or + * specify the schema explicitly using `schema`. + * + * You can find the CSV-specific options for reading CSV files in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.4.0 + */ + @scala.annotation.varargs + def csv(paths: String*): DataFrame = format("csv").load(paths: _*) + + /** + * Loads a Parquet file, returning the result as a `DataFrame`. See the documentation on the + * other overloaded `parquet()` method for more details. + * + * @since 3.4.0 + */ + def parquet(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + parquet(Seq(path): _*) + } + + /** + * Loads a Parquet file, returning the result as a `DataFrame`. + * + * Parquet-specific option(s) for reading Parquet files can be found in <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option"> Data + * Source Option</a> in the version you use. + * + * @since 3.4.0 + */ + @scala.annotation.varargs + def parquet(paths: String*): DataFrame = { + format("parquet").load(paths: _*) + } + + /** + * Loads an ORC file and returns the result as a `DataFrame`. + * + * @param path + * input path + * @since 3.4.0 + */ + def orc(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + orc(Seq(path): _*) + } + + /** + * Loads ORC files and returns the result as a `DataFrame`. + * + * ORC-specific option(s) for reading ORC files can be found in <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option"> Data + * Source Option</a> in the version you use. + * + * @param paths + * input paths + * @since 3.4.0 + */ + @scala.annotation.varargs + def orc(paths: String*): DataFrame = format("orc").load(paths: _*) + + /** + * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch + * reading and the returned DataFrame is the batch scan query plan of this table. If it's a + * view, the returned DataFrame is simply the query plan of the view, which can either be a + * batch or streaming query plan. + * + * @param tableName + * is either a qualified or unqualified name that designates a table or view. If a database is + * specified, it identifies the table/view from the database. Otherwise, it first attempts to + * find a temporary view with the given name and then match the table/view from the current + * database. Note that, the global temporary view database is also valid here. + * @since 3.4.0 + */ + def table(tableName: String): DataFrame = { + sparkSession.newDataset { builder => + builder.getReadBuilder.getNamedTableBuilder.setUnparsedIdentifier(tableName) + } + } + + /** + * Loads text files and returns a `DataFrame` whose schema starts with a string column named + * "value", and followed by partitioned columns if there are any. See the documentation on the + * other overloaded `text()` method for more details. + * + * @since 3.4.0 + */ + def text(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + text(Seq(path): _*) + } + + /** + * Loads text files and returns a `DataFrame` whose schema starts with a string column named + * "value", and followed by partitioned columns if there are any. The text files must be encoded + * as UTF-8. + * + * By default, each line in the text files is a new row in the resulting DataFrame. For example: + * {{{ + * // Scala: + * spark.read.text("/path/to/spark/README.md") + * + * // Java: + * spark.read().text("/path/to/spark/README.md") + * }}} + * + * You can find the text-specific options for reading text files in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @param paths + * input paths + * @since 3.4.0 + */ + @scala.annotation.varargs + def text(paths: String*): DataFrame = format("text").load(paths: _*) + + /** + * Loads text files and returns a [[Dataset]] of String. See the documentation on the other + * overloaded `textFile()` method for more details. + * @since 3.4.0 + */ + def textFile(path: String): Dataset[String] = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + textFile(Seq(path): _*) + } + + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". The text files must be encoded as UTF-8. + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * By default, each line in the text files is a new row in the resulting DataFrame. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.read().textFile("/path/to/spark/README.md") + * }}} + * + * You can set the text-specific options as specified in `DataFrameReader.text`. + * + * @param paths + * input path + * @since 3.4.0 + */ + @scala.annotation.varargs + def textFile(paths: String*): Dataset[String] = { + // scalastyle:off throwerror + // TODO: this method can be supported and should be included in the client API. + throw new NotImplementedError() + // scalastyle:on throwerror + } + + private def assertSourceFormatSpecified(): Unit = { + if (source == null) { + throw new IllegalArgumentException("The source format must be specified.") + } + } + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + + private var source: String = _ + + private var userSpecifiedSchema: Option[StructType] = None + + private var extraOptions = CaseInsensitiveMap[String](Map.empty) + +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7d6597bf6d5..54871c99b56 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -62,6 +62,35 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner: builder.setSql(proto.SQL.newBuilder().setQuery(query)) } + /** + * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a + * `DataFrame`. + * {{{ + * sparkSession.read.parquet("/path/to/file.parquet") + * sparkSession.read.schema(schema).json("/path/to/file.json") + * }}} + * + * @since 3.4.0 + */ + def read: DataFrameReader = new DataFrameReader(this) + + /** + * Returns the specified table/view as a `DataFrame`. If it's a table, it must support batch + * reading and the returned DataFrame is the batch scan query plan of this table. If it's a + * view, the returned DataFrame is simply the query plan of the view, which can either be a + * batch or streaming query plan. + * + * @param tableName + * is either a qualified or unqualified name that designates a table or view. If a database is + * specified, it identifies the table/view from the database. Otherwise, it first attempts to + * find a temporary view with the given name and then match the table/view from the current + * database. Note that, the global temporary view database is also valid here. + * @since 3.4.0 + */ + def table(tableName: String): DataFrame = { + read.table(tableName) + } + /** * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a * range from 0 to `end` (exclusive) with step value 1. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 60ac25ab7ba..058ba1a8efc 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -20,12 +20,13 @@ import java.io.{ByteArrayOutputStream, PrintStream} import scala.collection.JavaConverters._ +import io.grpc.StatusRuntimeException import org.apache.commons.io.output.TeeOutputStream import org.scalactic.TolerantNumerics -import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession} import org.apache.spark.sql.functions.udf -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types._ class ClientE2ETestSuite extends RemoteSparkSession { @@ -66,6 +67,67 @@ class ClientE2ETestSuite extends RemoteSparkSession { } } + test("read") { + val testDataPath = java.nio.file.Paths + .get( + IntegrationTestUtils.sparkHome, + "connector", + "connect", + "common", + "src", + "test", + "resources", + "query-tests", + "test-data", + "people.csv") + .toAbsolutePath + val df = spark.read + .format("csv") + .option("path", testDataPath.toString) + .options(Map("header" -> "true", "delimiter" -> ";")) + .schema( + StructType( + StructField("name", StringType) :: + StructField("age", IntegerType) :: + StructField("job", StringType) :: Nil)) + .load() + val array = df.collectResult().toArray + assert(array.length == 2) + assert(array(0).getString(0) == "Jorge") + assert(array(0).getInt(1) == 30) + assert(array(0).getString(2) == "Developer") + } + + test("read path collision") { + val testDataPath = java.nio.file.Paths + .get( + IntegrationTestUtils.sparkHome, + "connector", + "connect", + "common", + "src", + "test", + "resources", + "query-tests", + "test-data", + "people.csv") + .toAbsolutePath + val df = spark.read + .format("csv") + .option("path", testDataPath.toString) + .options(Map("header" -> "true", "delimiter" -> ";")) + .schema( + StructType( + StructField("name", StringType) :: + StructField("age", IntegerType) :: + StructField("job", StringType) :: Nil)) + .csv(testDataPath.toString) + // Failed because the path cannot be provided both via option and load method (csv). + assertThrows[StatusRuntimeException] { + df.collectResult().toArray + } + } + // TODO test large result when we can create table or view // test("test spark large result") private def captureStdOut(block: => Unit): String = { diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 416a9b33886..f8bae27cc0e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.Logging import org.apache.spark.sql.{functions => fn} import org.apache.spark.sql.connect.client.SparkConnectClient -import org.apache.spark.sql.types.{MapType, MetadataBuilder, StringType, StructType} +import org.apache.spark.sql.types._ // scalastyle:off /** @@ -82,6 +82,17 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit protected val queryFilePath: Path = baseResourcePath.resolve("queries") + // A relative path to /connector/connect/server, used by `ProtoToParsedPlanTestSuite` to run + // with the datasource. + protected val testDataPath: Path = java.nio.file.Paths.get( + "../", + "common", + "src", + "test", + "resources", + "query-tests", + "test-data") + private val printer = JsonFormat.printer() private var session: SparkSession = _ @@ -197,6 +208,47 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit session.range(1, 10, 1, 2) } + test("read") { + session.read + .format("csv") + .schema( + StructType( + StructField("name", StringType) :: + StructField("age", IntegerType) :: + StructField("job", StringType) :: Nil)) + .option("header", "true") + .options(Map("delimiter" -> ";")) + .load(testDataPath.resolve("people.csv").toString) + } + + test("read json") { + session.read.json(testDataPath.resolve("people.json").toString) + } + + test("read csv") { + session.read.csv(testDataPath.resolve("people.csv").toString) + } + + test("read parquet") { + session.read.parquet(testDataPath.resolve("users.parquet").toString) + } + + test("read orc") { + session.read.orc(testDataPath.resolve("users.orc").toString) + } + + test("read table") { + session.read.table("myTable") + } + + test("table") { + session.table("myTable") + } + + test("read text") { + session.read.text(testDataPath.resolve("people.txt").toString) + } + /* Dataset API */ test("select") { simple.select(fn.col("id")) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala index 867995efc2e..fa2cb18cda2 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala @@ -75,12 +75,16 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite // TODO(SPARK-42175) Add the Dataset object definition // IncludeByName("org.apache.spark.sql.Dataset$"), IncludeByName("org.apache.spark.sql.DataFrame"), + IncludeByName("org.apache.spark.sql.DataFrameReader"), IncludeByName("org.apache.spark.sql.SparkSession"), IncludeByName("org.apache.spark.sql.SparkSession$")) ++ includeImplementedMethods(clientJar) val excludeRules = Seq( // Filter unsupported rules: // Two sql overloading methods are marked experimental in the API and skipped in the client. ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"), + // Deprecated json methods and RDD related methods are skipped in the client. + ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), + ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.csv"), // Skip all shaded dependencies in the client. ProblemFilters.exclude[Problem]("org.sparkproject.*"), ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"), @@ -130,7 +134,8 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite // TODO(SPARK-42175) Add all overloading methods. Temporarily mute compatibility check for \ // the Dataset methods, as too many overload methods are missing. // "org.apache.spark.sql.Dataset", - "org.apache.spark.sql.SparkSession") + "org.apache.spark.sql.SparkSession", + "org.apache.spark.sql.DataFrameReader") val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray) clsNames diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala index f0ae4cad679..2725422c299 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala @@ -25,7 +25,7 @@ object IntegrationTestUtils { // System properties used for testing and debugging private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client" - private[connect] lazy val sparkHome: String = { + private[sql] lazy val sparkHome: String = { if (!(sys.props.contains("spark.test.home") || sys.env.contains("SPARK_HOME"))) { fail("spark.test.home or SPARK_HOME is not set.") } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain new file mode 100644 index 00000000000..30e5ec4542c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read.explain @@ -0,0 +1 @@ +Relation [none#0,none#1,none#2] csv diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain new file mode 100644 index 00000000000..069e76a8154 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_csv.explain @@ -0,0 +1 @@ +Relation [none#0] csv diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain new file mode 100644 index 00000000000..ead927c8f24 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_json.explain @@ -0,0 +1 @@ +Relation [none#0L,none#1] json diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain new file mode 100644 index 00000000000..9a8b1aab953 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_orc.explain @@ -0,0 +1 @@ +Relation [none#0,none#1,none#2] orc diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain new file mode 100644 index 00000000000..9fd4f43aeee --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_parquet.explain @@ -0,0 +1 @@ +Relation [none#0,none#1,none#2] parquet diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain new file mode 100644 index 00000000000..70393afccb2 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_path.explain @@ -0,0 +1 @@ +Relation [none#0,none#1] csv diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain new file mode 100644 index 00000000000..0ec9812e3b9 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_table.explain @@ -0,0 +1 @@ +'UnresolvedRelation [myTable], [], false diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain new file mode 100644 index 00000000000..3e78575dac6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_text.explain @@ -0,0 +1 @@ +Relation [none#0] text diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain new file mode 100644 index 00000000000..0ec9812e3b9 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/table.explain @@ -0,0 +1 @@ +'UnresolvedRelation [myTable], [], false diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read.json b/connector/connect/common/src/test/resources/query-tests/queries/read.json new file mode 100644 index 00000000000..f5ffb3c961b --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read.json @@ -0,0 +1,13 @@ +{ + "read": { + "dataSource": { + "format": "csv", + "schema": "name STRING,age INT,job STRING", + "options": { + "header": "true", + "delimiter": ";" + }, + "paths": ["../common/src/test/resources/query-tests/test-data/people.csv"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin new file mode 100644 index 00000000000..ede57af1130 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read.proto.bin @@ -0,0 +1,4 @@ +�� +csvname STRING,age INT,job STRING +headertrue + delimiter;"=../common/src/test/resources/query-tests/test-data/people.csv \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json new file mode 100644 index 00000000000..6095d200f62 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.json @@ -0,0 +1,8 @@ +{ + "read": { + "dataSource": { + "format": "csv", + "paths": ["../common/src/test/resources/query-tests/test-data/people.csv"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin new file mode 100644 index 00000000000..9bbab0fe2af --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_csv.proto.bin @@ -0,0 +1,2 @@ +FD +csv"=../common/src/test/resources/query-tests/test-data/people.csv \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_json.json b/connector/connect/common/src/test/resources/query-tests/queries/read_json.json new file mode 100644 index 00000000000..2e2f83d1191 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_json.json @@ -0,0 +1,8 @@ +{ + "read": { + "dataSource": { + "format": "json", + "paths": ["../common/src/test/resources/query-tests/test-data/people.json"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin new file mode 100644 index 00000000000..22557aca38d --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_json.proto.bin @@ -0,0 +1,2 @@ +HF +json">../common/src/test/resources/query-tests/test-data/people.json \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json new file mode 100644 index 00000000000..caa6c951d3e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.json @@ -0,0 +1,8 @@ +{ + "read": { + "dataSource": { + "format": "orc", + "paths": ["../common/src/test/resources/query-tests/test-data/users.orc"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin new file mode 100644 index 00000000000..95d07fd0a9b --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_orc.proto.bin @@ -0,0 +1,2 @@ +EC +orc"<../common/src/test/resources/query-tests/test-data/users.orc \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json new file mode 100644 index 00000000000..05d799fd9cb --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.json @@ -0,0 +1,8 @@ +{ + "read": { + "dataSource": { + "format": "parquet", + "paths": ["../common/src/test/resources/query-tests/test-data/users.parquet"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin new file mode 100644 index 00000000000..5fc1954428f --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_parquet.proto.bin @@ -0,0 +1,2 @@ +MK +parquet"@../common/src/test/resources/query-tests/test-data/users.parquet \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_path.json b/connector/connect/common/src/test/resources/query-tests/queries/read_path.json new file mode 100644 index 00000000000..c3fc8132a3b --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_path.json @@ -0,0 +1,11 @@ +{ + "read": { + "dataSource": { + "format": "csv", + "schema": "name STRING,age INT", + "options": { + "path": "../common/src/test/resources/query-tests/test-data/people.csv" + } + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin new file mode 100644 index 00000000000..01787253c42 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_path.proto.bin @@ -0,0 +1,3 @@ +ca +csvname STRING,age INTE +path=../common/src/test/resources/query-tests/test-data/people.csv \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_table.json b/connector/connect/common/src/test/resources/query-tests/queries/read_table.json new file mode 100644 index 00000000000..634310c27ff --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_table.json @@ -0,0 +1,7 @@ +{ + "read": { + "namedTable": { + "unparsedIdentifier": "myTable" + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin new file mode 100644 index 00000000000..f6ffaf988e7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_table.proto.bin @@ -0,0 +1,3 @@ + + +myTable \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_text.json b/connector/connect/common/src/test/resources/query-tests/queries/read_text.json new file mode 100644 index 00000000000..8dc1e26a70e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_text.json @@ -0,0 +1,8 @@ +{ + "read": { + "dataSource": { + "format": "text", + "paths": ["../common/src/test/resources/query-tests/test-data/people.txt"] + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin new file mode 100644 index 00000000000..97d167f6e2e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_text.proto.bin @@ -0,0 +1,2 @@ +GE +text"=../common/src/test/resources/query-tests/test-data/people.txt \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/table.json b/connector/connect/common/src/test/resources/query-tests/queries/table.json new file mode 100644 index 00000000000..634310c27ff --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/table.json @@ -0,0 +1,7 @@ +{ + "read": { + "namedTable": { + "unparsedIdentifier": "myTable" + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin new file mode 100644 index 00000000000..f6ffaf988e7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/table.proto.bin @@ -0,0 +1,3 @@ + + +myTable \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/test-data/people.csv b/connector/connect/common/src/test/resources/query-tests/test-data/people.csv new file mode 100644 index 00000000000..7fe5adba93d --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/test-data/people.csv @@ -0,0 +1,3 @@ +name;age;job +Jorge;30;Developer +Bob;32;Developer diff --git a/connector/connect/common/src/test/resources/query-tests/test-data/people.json b/connector/connect/common/src/test/resources/query-tests/test-data/people.json new file mode 100644 index 00000000000..50a859cbd7e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/test-data/people.json @@ -0,0 +1,3 @@ +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} diff --git a/connector/connect/common/src/test/resources/query-tests/test-data/people.txt b/connector/connect/common/src/test/resources/query-tests/test-data/people.txt new file mode 100644 index 00000000000..3bcace4a44c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/test-data/people.txt @@ -0,0 +1,3 @@ +Michael, 29 +Andy, 30 +Justin, 19 diff --git a/connector/connect/common/src/test/resources/query-tests/test-data/users.orc b/connector/connect/common/src/test/resources/query-tests/test-data/users.orc new file mode 100644 index 00000000000..12478a5d03c Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/test-data/users.orc differ diff --git a/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet b/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet new file mode 100644 index 00000000000..aa527338c43 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/test-data/users.parquet differ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org