[SPARK-25528][SQL] data source v2 API refactor (batch read) ## What changes were proposed in this pull request?
This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources. More concretely, it adds 1. `TableProvider`, works like an anonymous catalog 2. `Table`, represents a structured data set. 3. `ScanBuilder` and `Scan`, a logical represents of data source scan 4. `Batch`, a physical representation of data source batch scan. ## How was this patch tested? existing tests Closes #23086 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2c94a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2c94a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2c94a3 Branch: refs/heads/master Commit: 2b2c94a3ee89630047bcdd416a977e0d1cdb1926 Parents: 9cfc3ee Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Nov 30 00:02:43 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Nov 30 00:02:43 2018 -0800 ---------------------------------------------------------------------- .../kafka010/KafkaContinuousSourceSuite.scala | 4 +- .../sql/kafka010/KafkaContinuousTest.scala | 4 +- project/MimaExcludes.scala | 48 ++-- .../spark/sql/sources/v2/SupportsBatchRead.java | 33 +++ .../org/apache/spark/sql/sources/v2/Table.java | 59 +++++ .../spark/sql/sources/v2/TableProvider.java | 64 +++++ .../spark/sql/sources/v2/reader/Batch.java | 48 ++++ .../reader/OldSupportsReportPartitioning.java | 38 +++ .../v2/reader/OldSupportsReportStatistics.java | 38 +++ .../spark/sql/sources/v2/reader/Scan.java | 68 +++++ .../sql/sources/v2/reader/ScanBuilder.java | 30 +++ .../spark/sql/sources/v2/reader/ScanConfig.java | 4 +- .../spark/sql/sources/v2/reader/Statistics.java | 2 +- .../v2/reader/SupportsPushDownFilters.java | 4 +- .../reader/SupportsPushDownRequiredColumns.java | 4 +- .../v2/reader/SupportsReportPartitioning.java | 8 +- .../v2/reader/SupportsReportStatistics.java | 6 +- .../v2/reader/partitioning/Partitioning.java | 3 +- .../org/apache/spark/sql/DataFrameReader.scala | 36 +-- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 90 +++---- .../datasources/v2/DataSourceV2ScanExec.scala | 68 ++--- .../datasources/v2/DataSourceV2Strategy.scala | 34 +-- .../v2/DataSourceV2StreamingScanExec.scala | 120 +++++++++ .../execution/streaming/ProgressReporter.scala | 4 +- .../continuous/ContinuousExecution.scala | 5 +- .../sources/v2/JavaAdvancedDataSourceV2.java | 116 +++++---- .../sources/v2/JavaColumnarDataSourceV2.java | 27 +- .../v2/JavaPartitionAwareDataSource.java | 29 ++- .../v2/JavaSchemaRequiredDataSource.java | 36 ++- .../sql/sources/v2/JavaSimpleBatchTable.java | 91 +++++++ .../sql/sources/v2/JavaSimpleDataSourceV2.java | 19 +- .../sql/sources/v2/JavaSimpleReadSupport.java | 90 ------- .../sql/sources/v2/DataSourceV2Suite.scala | 260 ++++++++++--------- .../sources/v2/SimpleWritableDataSource.scala | 35 +-- .../streaming/continuous/ContinuousSuite.scala | 4 +- 36 files changed, 1016 insertions(+), 515 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index af51021..9ba066a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger @@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.executedPlan.collectFirst { - case scan: DataSourceV2ScanExec + case scan: DataSourceV2StreamingScanExec if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists { config => http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index aa21f12..5549e82 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.streaming.Trigger @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.executedPlan.collectFirst { - case scan: DataSourceV2ScanExec + case scan: DataSourceV2StreamingScanExec if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists(_.knownPartitions.size == newCount), http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 5e97d826..fcef424 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -197,37 +197,6 @@ object MimaExcludes { // [SPARK-23781][CORE] Merge token renewer functionality into HadoopDelegationTokenManager ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.nextCredentialRenewalTime"), - // Data Source V2 API changes - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ContinuousReadSupport"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ReadSupport"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.WriteSupport"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.StreamWriteSupport"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.MicroBatchReadSupport"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.DataSourceReader"), - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.InputPartition.createPartitionReader"), - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics.estimateStatistics"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"), - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"), - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.InputPartitionReader"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"), - // [SPARK-26133][ML] Remove deprecated OneHotEncoder and rename OneHotEncoderEstimator to OneHotEncoder ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.OneHotEncoder"), @@ -243,7 +212,22 @@ object MimaExcludes { // [SPARK-26141] Enable custom metrics implementation in shuffle write // Following are Java private classes ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this"), + + // Data Source V2 API changes + (problem: Problem) => problem match { + case MissingClassProblem(cls) => + !cls.fullName.startsWith("org.apache.spark.sql.sources.v2") + case MissingTypesProblem(newCls, _) => + !newCls.fullName.startsWith("org.apache.spark.sql.sources.v2") + case InheritedNewAbstractMethodProblem(cls, _) => + !cls.fullName.startsWith("org.apache.spark.sql.sources.v2") + case DirectMissingMethodProblem(meth) => + !meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2") + case ReversedMissingMethodProblem(meth) => + !meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2") + case _ => true + } ) // Exclude rules for 2.4.x http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java new file mode 100644 index 0000000..0df89db --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; + +/** + * An empty mix-in interface for {@link Table}, to indicate this table supports batch scan. + * <p> + * If a {@link Table} implements this interface, its {@link Table#newScanBuilder(DataSourceOptions)} + * must return a {@link ScanBuilder} that builds {@link Scan} with {@link Scan#toBatch()} + * implemented. + * </p> + */ +@Evolving +public interface SupportsBatchRead extends Table { } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java new file mode 100644 index 0000000..0c65fe0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * <p> + * This interface can mixin the following interfaces to support different operations: + * </p> + * <ul> + * <li>{@link SupportsBatchRead}: this table can be read in batch queries.</li> + * </ul> + */ +@Evolving +public interface Table { + + /** + * A name to identify this table. Implementations should provide a meaningful name, like the + * database and table name from catalog, or the location of files for this table. + */ + String name(); + + /** + * Returns the schema of this table. + */ + StructType schema(); + + /** + * Returns a {@link ScanBuilder} which can be used to build a {@link Scan} later. Spark will call + * this method for each data scanning query. + * <p> + * The builder can take some query specific information to do operators pushdown, and keep these + * information in the created {@link Scan}. + * </p> + */ + ScanBuilder newScanBuilder(DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java new file mode 100644 index 0000000..855d5ef --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * <p> + * The major responsibility of this interface is to return a {@link Table} for read/write. + * </p> + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + * topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * <p> + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user-specified schema. + * </p> + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + * topic name, etc. It's an immutable case-insensitive string-to-string map. + * @param schema the user-specified schema. + * @throws UnsupportedOperationException + */ + default Table getTable(DataSourceOptions options, StructType schema) { + String name; + if (this instanceof DataSourceRegister) { + name = ((DataSourceRegister) this).shortName(); + } else { + name = this.getClass().getName(); + } + throw new UnsupportedOperationException( + name + " source does not support user-specified schema"); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java new file mode 100644 index 0000000..bcfa198 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; + +/** + * A physical representation of a data source scan for batch queries. This interface is used to + * provide physical information, like how many partitions the scanned data has, and how to read + * records from the partitions. + */ +@Evolving +public interface Batch { + + /** + * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition} + * represents a data split that can be processed by one Spark task. The number of input + * partitions returned here is the same as the number of RDD partitions this scan outputs. + * <p> + * If the {@link Scan} supports filter pushdown, this Batch is likely configured with a filter + * and is responsible for creating splits for that filter, which is not a full scan. + * </p> + * <p> + * This method will be called only once during a data source scan, to launch one Spark job. + * </p> + */ + InputPartition[] planInputPartitions(); + + /** + * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + */ + PartitionReaderFactory createReaderFactory(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java new file mode 100644 index 0000000..347a465 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; + +/** + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * report data partitioning and try to avoid shuffle at Spark side. + * + * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition}, + * Spark may avoid adding a shuffle even if the reader does not implement this interface. + */ +@Evolving +// TODO: remove it, after we finish the API refactor completely. +public interface OldSupportsReportPartitioning extends ReadSupport { + + /** + * Returns the output data partitioning that this reader guarantees. + */ + Partitioning outputPartitioning(ScanConfig config); +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java new file mode 100644 index 0000000..0d3ec17 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * report statistics to Spark. + * + * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the + * data source. Implementations that return more accurate statistics based on pushed operators will + * not improve query performance until the planner can push operators before getting stats. + */ +@Evolving +// TODO: remove it, after we finish the API refactor completely. +public interface OldSupportsReportStatistics extends ReadSupport { + + /** + * Returns the estimated statistics of this data source scan. + */ + Statistics estimateStatistics(ScanConfig config); +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java new file mode 100644 index 0000000..4d84fb1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A logical representation of a data source scan. This interface is used to provide logical + * information, like what the actual read schema is. + * <p> + * This logical representation is shared between batch scan, micro-batch streaming scan and + * continuous streaming scan. Data sources must implement the corresponding methods in this + * interface, to match what the table promises to support. For example, {@link #toBatch()} must be + * implemented, if the {@link Table} that creates this {@link Scan} implements + * {@link SupportsBatchRead}. + * </p> + */ +@Evolving +public interface Scan { + + /** + * Returns the actual schema of this data source scan, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * A description string of this scan, which may includes information like: what filters are + * configured for this scan, what's the value of some important options like path, etc. The + * description doesn't need to include {@link #readSchema()}, as Spark already knows it. + * <p> + * By default this returns the class name of the implementation. Please override it to provide a + * meaningful description. + * </p> + */ + default String description() { + return this.getClass().toString(); + } + + /** + * Returns the physical representation of this scan for batch query. By default this method throws + * exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this scan implements {@link SupportsBatchRead}. + * + * @throws UnsupportedOperationException + */ + default Batch toBatch() { + throw new UnsupportedOperationException("Batch scans are not supported"); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java new file mode 100644 index 0000000..d4bc1ff --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; + +/** + * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ + * interfaces to do operator pushdown, and keep the operator pushdown result in the returned + * {@link Scan}. + */ +@Evolving +public interface ScanBuilder { + Scan build(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java index a69872a..c8cff68 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java @@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType; * For APIs that take a {@link ScanConfig} as input, like * {@link ReadSupport#planInputPartitions(ScanConfig)}, * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and - * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need to - * cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. + * {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need + * to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. */ @Evolving public interface ScanConfig { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index 14776f3..a0b194a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -23,7 +23,7 @@ import org.apache.spark.annotation.Evolving; /** * An interface to represent statistics for a data source, which is returned by - * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}. + * {@link SupportsReportStatistics#estimateStatistics()}. */ @Evolving public interface Statistics { http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index 3a89baa..296d3e4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -21,11 +21,11 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.Filter; /** - * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this interface to + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to * push down filters to the data source and reduce the size of the data to be read. */ @Evolving -public interface SupportsPushDownFilters extends ScanConfigBuilder { +public interface SupportsPushDownFilters extends ScanBuilder { /** * Pushes down filters, and returns filters that need to be evaluated after scanning. http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index 1934763..60e71c5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -21,12 +21,12 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this * interface to push down required columns to the data source and only read these columns during * scan to reduce the size of the data to be read. */ @Evolving -public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder { +public interface SupportsPushDownRequiredColumns extends ScanBuilder { /** * Applies column pruning w.r.t. the given requiredSchema. http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java index 0335c77..ba17581 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java @@ -21,17 +21,17 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; /** - * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * A mix in interface for {@link Batch}. Data sources can implement this interface to * report data partitioning and try to avoid shuffle at Spark side. * - * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition}, + * Note that, when a {@link Batch} implementation creates exactly one {@link InputPartition}, * Spark may avoid adding a shuffle even if the reader does not implement this interface. */ @Evolving -public interface SupportsReportPartitioning extends ReadSupport { +public interface SupportsReportPartitioning extends Batch { /** * Returns the output data partitioning that this reader guarantees. */ - Partitioning outputPartitioning(ScanConfig config); + Partitioning outputPartitioning(); } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index 917372c..d9f5fb6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.Evolving; /** - * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * A mix in interface for {@link Batch}. Data sources can implement this interface to * report statistics to Spark. * * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the @@ -28,10 +28,10 @@ import org.apache.spark.annotation.Evolving; * not improve query performance until the planner can push operators before getting stats. */ @Evolving -public interface SupportsReportStatistics extends ReadSupport { +public interface SupportsReportStatistics extends Batch { /** * Returns the estimated statistics of this data source scan. */ - Statistics estimateStatistics(ScanConfig config); + Statistics estimateStatistics(); } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java index c9a0026..c7370eb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java @@ -19,12 +19,11 @@ package org.apache.spark.sql.sources.v2.reader.partitioning; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.ScanConfig; import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning; /** * An interface to represent the output data partitioning for a data source, which is returned by - * {@link SupportsReportPartitioning#outputPartitioning(ScanConfig)}. Note that this should work + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work * like a snapshot. Once created, it should be deterministic and always report the same number of * partitions and the same "satisfy" result for a certain distribution. */ http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index da88598..661fe98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -194,20 +194,26 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) - if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val ds = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - if (ds.isInstanceOf[BatchReadSupportProvider]) { - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - ds = ds, conf = sparkSession.sessionState.conf) - val pathsOption = { - val objectMapper = new ObjectMapper() - DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) - } - Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - ds, sessionOptions ++ extraOptions.toMap + pathsOption, - userSpecifiedSchema = userSpecifiedSchema)) - } else { - loadV1Source(paths: _*) + if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = provider, conf = sparkSession.sessionState.conf) + val pathsOption = { + val objectMapper = new ObjectMapper() + DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) + } + val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + val dsOptions = new DataSourceOptions(finalOptions.asJava) + val table = userSpecifiedSchema match { + case Some(schema) => provider.getTable(dsOptions, schema) + case _ => provider.getTable(dsOptions) + } + table match { + case s: SupportsBatchRead => + Dataset.ofRows(sparkSession, DataSourceV2Relation.create( + provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema)) + + case _ => loadV1Source(paths: _*) } } else { loadV1Source(paths: _*) http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5a807d3..b9c4076 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val options = sessionOptions ++ extraOptions if (mode == SaveMode.Append) { - val relation = DataSourceV2Relation.create(source, options) + val relation = DataSourceV2Relation.createRelationForWrite(source, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index f7e2959..0a6b0af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -22,13 +22,13 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, BatchWriteSupportProvider, DataSourceOptions, DataSourceV2} -import org.apache.spark.sql.sources.v2.reader.{BatchReadSupport, ReadSupport, ScanConfigBuilder, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType @@ -40,32 +40,38 @@ import org.apache.spark.sql.types.StructType * @param userSpecifiedSchema The user-specified schema for this scan. */ case class DataSourceV2Relation( - source: DataSourceV2, - readSupport: BatchReadSupport, + // TODO: remove `source` when we finish API refactor for write. + source: TableProvider, + table: SupportsBatchRead, output: Seq[AttributeReference], options: Map[String, String], - tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None) - extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat { + extends LeafNode with MultiInstanceRelation with NamedRelation { import DataSourceV2Relation._ - override def name: String = { - tableIdent.map(_.unquotedString).getOrElse(s"${source.name}:unknown") - } - - override def pushedFilters: Seq[Expression] = Seq.empty + override def name: String = table.name() - override def simpleString: String = "RelationV2 " + metadataString + override def simpleString: String = { + s"RelationV2${truncatedString(output, "[", ", ", "]")} $name" + } def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) - override def computeStats(): Statistics = readSupport match { - case r: SupportsReportStatistics => - val statistics = r.estimateStatistics(readSupport.newScanConfigBuilder().build()) - Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) - case _ => - Statistics(sizeInBytes = conf.defaultSizeInBytes) + def newScanBuilder(): ScanBuilder = { + val dsOptions = new DataSourceOptions(options.asJava) + table.newScanBuilder(dsOptions) + } + + override def computeStats(): Statistics = { + val scan = newScanBuilder().build() + scan match { + case r: SupportsReportStatistics => + val statistics = r.estimateStatistics() + Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) + case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } } override def newInstance(): DataSourceV2Relation = { @@ -109,7 +115,7 @@ case class StreamingDataSourceV2Relation( } override def computeStats(): Statistics = readSupport match { - case r: SupportsReportStatistics => + case r: OldSupportsReportStatistics => val statistics = r.estimateStatistics(scanConfigBuilder.build()) Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes)) case _ => @@ -119,15 +125,6 @@ case class StreamingDataSourceV2Relation( object DataSourceV2Relation { private implicit class SourceHelpers(source: DataSourceV2) { - def asReadSupportProvider: BatchReadSupportProvider = { - source match { - case provider: BatchReadSupportProvider => - provider - case _ => - throw new AnalysisException(s"Data source is not readable: $name") - } - } - def asWriteSupportProvider: BatchWriteSupportProvider = { source match { case provider: BatchWriteSupportProvider => @@ -146,18 +143,6 @@ object DataSourceV2Relation { } } - def createReadSupport( - options: Map[String, String], - userSpecifiedSchema: Option[StructType]): BatchReadSupport = { - val v2Options = new DataSourceOptions(options.asJava) - userSpecifiedSchema match { - case Some(s) => - asReadSupportProvider.createBatchReadSupport(s, v2Options) - case _ => - asReadSupportProvider.createBatchReadSupport(v2Options) - } - } - def createWriteSupport( options: Map[String, String], schema: StructType): BatchWriteSupport = { @@ -170,20 +155,21 @@ object DataSourceV2Relation { } def create( - source: DataSourceV2, + provider: TableProvider, + table: SupportsBatchRead, options: Map[String, String], - tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { - val readSupport = source.createReadSupport(options, userSpecifiedSchema) - val output = readSupport.fullSchema().toAttributes - val ident = tableIdent.orElse(tableFromOptions(options)) - DataSourceV2Relation( - source, readSupport, output, options, ident, userSpecifiedSchema) + val output = table.schema().toAttributes + DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema) } - private def tableFromOptions(options: Map[String, String]): Option[TableIdentifier] = { - options - .get(DataSourceOptions.TABLE_KEY) - .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY))) + // TODO: remove this when we finish API refactor for write. + def createRelationForWrite( + source: DataSourceV2, + options: Map[String, String]): DataSourceV2Relation = { + val provider = source.asInstanceOf[TableProvider] + val dsOptions = new DataSourceOptions(options.asJava) + val table = provider.getTable(dsOptions) + create(provider, table.asInstanceOf[SupportsBatchRead], options) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 25f86a6..725bcc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -22,60 +22,47 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], - @transient source: DataSourceV2, - @transient options: Map[String, String], - @transient pushedFilters: Seq[Expression], - @transient readSupport: ReadSupport, - @transient scanConfig: ScanConfig) - extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { + scanDesc: String, + @transient batch: Batch) + extends LeafExecNode with ColumnarBatchScan { - override def simpleString: String = "ScanV2 " + metadataString + override def simpleString: String = { + s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc" + } // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: DataSourceV2ScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && - options == other.options + case other: DataSourceV2ScanExec => this.batch == other.batch case _ => false } - override def hashCode(): Int = { - Seq(output, source, options).hashCode() - } + override def hashCode(): Int = batch.hashCode() + + private lazy val partitions = batch.planInputPartitions() + + private lazy val readerFactory = batch.createReaderFactory() - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = batch match { case _ if partitions.length == 1 => SinglePartition case s: SupportsReportPartitioning => new DataSourcePartitioning( - s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) + s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - - private lazy val readerFactory = readSupport match { - case r: BatchReadSupport => r.createReaderFactory(scanConfig) - case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig) - case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig) - case _ => throw new IllegalStateException("unknown read support: " + readSupport) - } - - // TODO: clean this up when we have dedicated scan plan for continuous streaming. - override val supportsBatch: Boolean = { + override def supportsBatch: Boolean = { require(partitions.forall(readerFactory.supportColumnarReads) || !partitions.exists(readerFactory.supportColumnarReads), "Cannot mix row-based and columnar input partitions.") @@ -83,25 +70,8 @@ case class DataSourceV2ScanExec( partitions.exists(readerFactory.supportColumnarReads) } - private lazy val inputRDD: RDD[InternalRow] = readSupport match { - case _: ContinuousReadSupport => - assert(!supportsBatch, - "continuous stream reader does not support columnar read yet.") - EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), - sparkContext.env) - .askSync[Unit](SetReaderPartitions(partitions.size)) - new ContinuousDataSourceRDD( - sparkContext, - sqlContext.conf.continuousStreamingExecutorQueueSize, - sqlContext.conf.continuousStreamingExecutorPollIntervalMs, - partitions, - schema, - readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) - - case _ => - new DataSourceRDD( - sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch) + private lazy val inputRDD: RDD[InternalRow] = { + new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch) } override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 9a3109e..2e26fce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -37,9 +37,9 @@ object DataSourceV2Strategy extends Strategy { * @return pushed filter and post-scan filters. */ private def pushFilters( - configBuilder: ScanConfigBuilder, + scanBuilder: ScanBuilder, filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { - configBuilder match { + scanBuilder match { case r: SupportsPushDownFilters => // A map from translated data source filters to original catalyst filter expressions. val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression] @@ -76,18 +76,18 @@ object DataSourceV2Strategy extends Strategy { */ // TODO: nested column pruning. private def pruneColumns( - configBuilder: ScanConfigBuilder, + scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (ScanConfig, Seq[AttributeReference]) = { - configBuilder match { + exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + scanBuilder match { case r: SupportsPushDownRequiredColumns => val requiredColumns = AttributeSet(exprs.flatMap(_.references)) val neededOutput = relation.output.filter(requiredColumns.contains) if (neededOutput != relation.output) { r.pruneColumns(neededOutput.toStructType) - val config = r.build() + val scan = r.build() val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - config -> config.readSchema().toAttributes.map { + scan -> scan.readSchema().toAttributes.map { // We have to keep the attribute id during transformation. a => a.withExprId(nameToAttr(a.name).exprId) } @@ -95,19 +95,19 @@ object DataSourceV2Strategy extends Strategy { r.build() -> relation.output } - case _ => configBuilder.build() -> relation.output + case _ => scanBuilder.build() -> relation.output } } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => - val configBuilder = relation.readSupport.newScanConfigBuilder() + val scanBuilder = relation.newScanBuilder() // `pushedFilters` will be pushed down and evaluated in the underlying data sources. // `postScanFilters` need to be evaluated after the scan. // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter. - val (pushedFilters, postScanFilters) = pushFilters(configBuilder, filters) - val (config, output) = pruneColumns(configBuilder, relation, project ++ postScanFilters) + val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters) + val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" |Pushing operators to ${relation.source.getClass} @@ -116,16 +116,10 @@ object DataSourceV2Strategy extends Strategy { |Output: ${output.mkString(", ")} """.stripMargin) - val scan = DataSourceV2ScanExec( - output, - relation.source, - relation.options, - pushedFilters, - relation.readSupport, - config) + val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch) val filterCondition = postScanFilters.reduceLeftOption(And) - val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan) // always add the projection, which will produce unsafe rows required by some operators ProjectExec(project, withFilter) :: Nil @@ -135,7 +129,7 @@ object DataSourceV2Strategy extends Strategy { val scanConfig = r.scanConfigBuilder.build() // ensure there is a projection, which will produce unsafe rows required by some operators ProjectExec(r.output, - DataSourceV2ScanExec( + DataSourceV2StreamingScanExec( r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil case WriteToDataSourceV2(writer, query) => http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala new file mode 100644 index 0000000..c872940 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.DataSourceV2 +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} + +/** + * Physical plan node for scanning data from a data source. + */ +// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we finish the API refactor +// completely. +case class DataSourceV2StreamingScanExec( + output: Seq[AttributeReference], + @transient source: DataSourceV2, + @transient options: Map[String, String], + @transient pushedFilters: Seq[Expression], + @transient readSupport: ReadSupport, + @transient scanConfig: ScanConfig) + extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { + + override def simpleString: String = "ScanV2 " + metadataString + + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. + override def equals(other: Any): Boolean = other match { + case other: DataSourceV2StreamingScanExec => + output == other.output && readSupport.getClass == other.readSupport.getClass && + options == other.options + case _ => false + } + + override def hashCode(): Int = { + Seq(output, source, options).hashCode() + } + + override def outputPartitioning: physical.Partitioning = readSupport match { + case _ if partitions.length == 1 => + SinglePartition + + case s: OldSupportsReportPartitioning => + new DataSourcePartitioning( + s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) + + case _ => super.outputPartitioning + } + + private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) + + private lazy val readerFactory = readSupport match { + case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig) + case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig) + case _ => throw new IllegalStateException("unknown read support: " + readSupport) + } + + override val supportsBatch: Boolean = { + require(partitions.forall(readerFactory.supportColumnarReads) || + !partitions.exists(readerFactory.supportColumnarReads), + "Cannot mix row-based and columnar input partitions.") + + partitions.exists(readerFactory.supportColumnarReads) + } + + private lazy val inputRDD: RDD[InternalRow] = readSupport match { + case _: ContinuousReadSupport => + assert(!supportsBatch, + "continuous stream reader does not support columnar read yet.") + EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) + .askSync[Unit](SetReaderPartitions(partitions.size)) + new ContinuousDataSourceRDD( + sparkContext, + sqlContext.conf.continuousStreamingExecutorQueueSize, + sqlContext.conf.continuousStreamingExecutorPollIntervalMs, + partitions, + schema, + readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) + + case _ => + new DataSourceRDD( + sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch) + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) + + override protected def doExecute(): RDD[InternalRow] = { + if (supportsBatch) { + WholeStageCodegenExec(this)(codegenStageId = 0).execute() + } else { + val numOutputRows = longMetric("numOutputRows") + inputRDD.map { r => + numOutputRows += 1 + r + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 392229b..6a22f0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent @@ -256,7 +256,7 @@ trait ProgressReporter extends Logging { // (can happen with self-unions or self-joins). This means the source is scanned multiple // times in the query, we should count the numRows for each scan. val sourceToInputRowsTuples = lastExecution.executedPlan.collect { - case s: DataSourceV2ScanExec if s.readSupport.isInstanceOf[BaseStreamingSource] => + case s: DataSourceV2StreamingScanExec if s.readSupport.isInstanceOf[BaseStreamingSource] => val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) val source = s.readSupport.asInstanceOf[BaseStreamingSource] source -> numRows http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 1eab551..af23c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, StreamingDataSourceV2Relation} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamingDataSourceV2Relation} import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider} @@ -206,7 +206,8 @@ class ContinuousExecution( } val (readSupport, scanConfig) = lastExecution.executedPlan.collect { - case scan: DataSourceV2ScanExec if scan.readSupport.isInstanceOf[ContinuousReadSupport] => + case scan: DataSourceV2StreamingScanExec + if scan.readSupport.isInstanceOf[ContinuousReadSupport] => scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig }.head http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java index 5602310..2612b61 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -24,62 +24,29 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; -import org.apache.spark.sql.sources.v2.BatchReadSupportProvider; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; -public class JavaAdvancedDataSourceV2 implements DataSourceV2, BatchReadSupportProvider { +public class JavaAdvancedDataSourceV2 implements TableProvider { - public class ReadSupport extends JavaSimpleReadSupport { - @Override - public ScanConfigBuilder newScanConfigBuilder() { - return new AdvancedScanConfigBuilder(); - } - - @Override - public InputPartition[] planInputPartitions(ScanConfig config) { - Filter[] filters = ((AdvancedScanConfigBuilder) config).filters; - List<InputPartition> res = new ArrayList<>(); - - Integer lowerBound = null; - for (Filter filter : filters) { - if (filter instanceof GreaterThan) { - GreaterThan f = (GreaterThan) filter; - if ("i".equals(f.attribute()) && f.value() instanceof Integer) { - lowerBound = (Integer) f.value(); - break; - } - } - } - - if (lowerBound == null) { - res.add(new JavaRangeInputPartition(0, 5)); - res.add(new JavaRangeInputPartition(5, 10)); - } else if (lowerBound < 4) { - res.add(new JavaRangeInputPartition(lowerBound + 1, 5)); - res.add(new JavaRangeInputPartition(5, 10)); - } else if (lowerBound < 9) { - res.add(new JavaRangeInputPartition(lowerBound + 1, 10)); + @Override + public Table getTable(DataSourceOptions options) { + return new JavaSimpleBatchTable() { + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + return new AdvancedScanBuilder(); } - - return res.stream().toArray(InputPartition[]::new); - } - - @Override - public PartitionReaderFactory createReaderFactory(ScanConfig config) { - StructType requiredSchema = ((AdvancedScanConfigBuilder) config).requiredSchema; - return new AdvancedReaderFactory(requiredSchema); - } + }; } - public static class AdvancedScanConfigBuilder implements ScanConfigBuilder, ScanConfig, + static class AdvancedScanBuilder implements ScanBuilder, Scan, SupportsPushDownFilters, SupportsPushDownRequiredColumns { - // Exposed for testing. - public StructType requiredSchema = new StructType().add("i", "int").add("j", "int"); - public Filter[] filters = new Filter[0]; + private StructType requiredSchema = new StructType().add("i", "int").add("j", "int"); + private Filter[] filters = new Filter[0]; @Override public void pruneColumns(StructType requiredSchema) { @@ -121,9 +88,58 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, BatchReadSupportP } @Override - public ScanConfig build() { + public Scan build() { return this; } + + @Override + public Batch toBatch() { + return new AdvancedBatch(requiredSchema, filters); + } + } + + public static class AdvancedBatch implements Batch { + // Exposed for testing. + public StructType requiredSchema; + public Filter[] filters; + + AdvancedBatch(StructType requiredSchema, Filter[] filters) { + this.requiredSchema = requiredSchema; + this.filters = filters; + } + + @Override + public InputPartition[] planInputPartitions() { + List<InputPartition> res = new ArrayList<>(); + + Integer lowerBound = null; + for (Filter filter : filters) { + if (filter instanceof GreaterThan) { + GreaterThan f = (GreaterThan) filter; + if ("i".equals(f.attribute()) && f.value() instanceof Integer) { + lowerBound = (Integer) f.value(); + break; + } + } + } + + if (lowerBound == null) { + res.add(new JavaRangeInputPartition(0, 5)); + res.add(new JavaRangeInputPartition(5, 10)); + } else if (lowerBound < 4) { + res.add(new JavaRangeInputPartition(lowerBound + 1, 5)); + res.add(new JavaRangeInputPartition(5, 10)); + } else if (lowerBound < 9) { + res.add(new JavaRangeInputPartition(lowerBound + 1, 10)); + } + + return res.stream().toArray(InputPartition[]::new); + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new AdvancedReaderFactory(requiredSchema); + } } static class AdvancedReaderFactory implements PartitionReaderFactory { @@ -165,10 +181,4 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, BatchReadSupportP }; } } - - - @Override - public BatchReadSupport createBatchReadSupport(DataSourceOptions options) { - return new ReadSupport(); - } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java index 28a9330..d72ab53 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java @@ -21,21 +21,21 @@ import java.io.IOException; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; -import org.apache.spark.sql.sources.v2.BatchReadSupportProvider; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; -public class JavaColumnarDataSourceV2 implements DataSourceV2, BatchReadSupportProvider { +public class JavaColumnarDataSourceV2 implements TableProvider { - class ReadSupport extends JavaSimpleReadSupport { + class MyScanBuilder extends JavaSimpleScanBuilder { @Override - public InputPartition[] planInputPartitions(ScanConfig config) { + public InputPartition[] planInputPartitions() { InputPartition[] partitions = new InputPartition[2]; partitions[0] = new JavaRangeInputPartition(0, 50); partitions[1] = new JavaRangeInputPartition(50, 90); @@ -43,11 +43,21 @@ public class JavaColumnarDataSourceV2 implements DataSourceV2, BatchReadSupportP } @Override - public PartitionReaderFactory createReaderFactory(ScanConfig config) { + public PartitionReaderFactory createReaderFactory() { return new ColumnarReaderFactory(); } } + @Override + public Table getTable(DataSourceOptions options) { + return new JavaSimpleBatchTable() { + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + return new MyScanBuilder(); + } + }; + } + static class ColumnarReaderFactory implements PartitionReaderFactory { private static final int BATCH_SIZE = 20; @@ -106,9 +116,4 @@ public class JavaColumnarDataSourceV2 implements DataSourceV2, BatchReadSupportP }; } } - - @Override - public BatchReadSupport createBatchReadSupport(DataSourceOptions options) { - return new ReadSupport(); - } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java index 18a11dd..a513bfb 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java @@ -22,18 +22,20 @@ import java.util.Arrays; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.sources.v2.*; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution; import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution; import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; -public class JavaPartitionAwareDataSource implements DataSourceV2, BatchReadSupportProvider { +public class JavaPartitionAwareDataSource implements TableProvider { - class ReadSupport extends JavaSimpleReadSupport implements SupportsReportPartitioning { + class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportPartitioning { @Override - public InputPartition[] planInputPartitions(ScanConfig config) { + public InputPartition[] planInputPartitions() { InputPartition[] partitions = new InputPartition[2]; partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}); partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}); @@ -41,16 +43,26 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, BatchReadSupp } @Override - public PartitionReaderFactory createReaderFactory(ScanConfig config) { + public PartitionReaderFactory createReaderFactory() { return new SpecificReaderFactory(); } @Override - public Partitioning outputPartitioning(ScanConfig config) { + public Partitioning outputPartitioning() { return new MyPartitioning(); } } + @Override + public Table getTable(DataSourceOptions options) { + return new JavaSimpleBatchTable() { + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + return new MyScanBuilder(); + } + }; + } + static class MyPartitioning implements Partitioning { @Override @@ -106,9 +118,4 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, BatchReadSupp }; } } - - @Override - public BatchReadSupport createBatchReadSupport(DataSourceOptions options) { - return new ReadSupport(); - } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java index cc9ac04..815d57b 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -17,39 +17,51 @@ package test.org.apache.spark.sql.sources.v2; -import org.apache.spark.sql.sources.v2.BatchReadSupportProvider; import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; -public class JavaSchemaRequiredDataSource implements DataSourceV2, BatchReadSupportProvider { +public class JavaSchemaRequiredDataSource implements TableProvider { - class ReadSupport extends JavaSimpleReadSupport { - private final StructType schema; + class MyScanBuilder extends JavaSimpleScanBuilder { - ReadSupport(StructType schema) { + private StructType schema; + + MyScanBuilder(StructType schema) { this.schema = schema; } @Override - public StructType fullSchema() { + public StructType readSchema() { return schema; } @Override - public InputPartition[] planInputPartitions(ScanConfig config) { + public InputPartition[] planInputPartitions() { return new InputPartition[0]; } } @Override - public BatchReadSupport createBatchReadSupport(DataSourceOptions options) { - throw new IllegalArgumentException("requires a user-supplied schema"); + public Table getTable(DataSourceOptions options, StructType schema) { + return new JavaSimpleBatchTable() { + + @Override + public StructType schema() { + return schema; + } + + @Override + public ScanBuilder newScanBuilder(DataSourceOptions options) { + return new MyScanBuilder(schema); + } + }; } @Override - public BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) { - return new ReadSupport(schema); + public Table getTable(DataSourceOptions options) { + throw new IllegalArgumentException("requires a user-supplied schema"); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org