http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c7b74f3..946b636 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.{Locale, Optional, Properties} +import java.util.{Locale, Properties} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -44,11 +44,9 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} -import org.apache.spark.sql.types.StructType abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { @@ -118,14 +116,16 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf query.nonEmpty, "Cannot add data when there is no query for finding the active kafka source") - val sources = { + val sources: Seq[BaseStreamingSource] = { query.get.logicalPlan.collect { case StreamingExecutionRelation(source: KafkaSource, _) => source - case StreamingExecutionRelation(source: KafkaMicroBatchReader, _) => source + case StreamingExecutionRelation(source: KafkaMicroBatchReadSupport, _) => source } ++ (query.get.lastExecution match { case null => Seq() case e => e.logical.collect { - case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader + case r: StreamingDataSourceV2Relation + if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + r.readSupport.asInstanceOf[KafkaContinuousReadSupport] } }) }.distinct @@ -650,7 +650,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { makeSureGetOffsetCalled, AssertOnQuery { query => query.logicalPlan.collect { - case StreamingExecutionRelation(_: KafkaMicroBatchReader, _) => true + case StreamingExecutionRelation(_: KafkaMicroBatchReadSupport, _) => true }.nonEmpty } ) @@ -675,17 +675,16 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { "kafka.bootstrap.servers" -> testUtils.brokerAddress, "subscribe" -> topic ) ++ Option(minPartitions).map { p => "minPartitions" -> p} - val reader = provider.createMicroBatchReader( - Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava)) - reader.setOffsetRange( - Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))), - Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L))) - ) - val factories = reader.planInputPartitions().asScala + val readSupport = provider.createMicroBatchReadSupport( + dir.getAbsolutePath, new DataSourceOptions(options.asJava)) + val config = readSupport.newScanConfigBuilder( + KafkaSourceOffset(Map(tp -> 0L)), + KafkaSourceOffset(Map(tp -> 100L))).build() + val inputPartitions = readSupport.planInputPartitions(config) .map(_.asInstanceOf[KafkaMicroBatchInputPartition]) - withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") { - assert(factories.size == numPartitionsGenerated) - factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } + withClue(s"minPartitions = $minPartitions generated factories $inputPartitions\n\t") { + assert(inputPartitions.size == numPartitionsGenerated) + inputPartitions.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } } } }
http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java new file mode 100644 index 0000000..f403dc6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.BatchReadSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for batch processing. + * + * This interface is used to create {@link BatchReadSupport} instances when end users run + * {@code SparkSession.read.format(...).option(...).load()}. + */ +@InterfaceStability.Evolving +public interface BatchReadSupportProvider extends DataSourceV2 { + + /** + * Creates a {@link BatchReadSupport} instance to load the data from this data source with a user + * specified schema, which is called by Spark at the beginning of each batch query. + * + * Spark will call this method at the beginning of each batch query to create a + * {@link BatchReadSupport} instance. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user specified schema. + * + * @param schema the user specified schema. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) { + return DataSourceV2Utils.failForUserSpecifiedSchema(this); + } + + /** + * Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is + * called by Spark at the beginning of each batch query. + * + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + BatchReadSupport createBatchReadSupport(DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java new file mode 100644 index 0000000..bd10c33 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability for batch processing. + * + * This interface is used to create {@link BatchWriteSupport} instances when end users run + * {@code Dataset.write.format(...).option(...).save()}. + */ +@InterfaceStability.Evolving +public interface BatchWriteSupportProvider extends DataSourceV2 { + + /** + * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source, + * which is called by Spark at the beginning of each batch query. + * + * Data sources can return None if there is no writing needed to be done according to the save + * mode. + * + * @param queryId A unique string for the writing query. It's possible that there are many + * writing queries running at the same time, and the returned + * {@link BatchWriteSupport} can use this id to distinguish itself from others. + * @param schema the schema of the data to be written. + * @param mode the save mode which determines what to do when the data are already in this data + * source, please refer to {@link SaveMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + * case-insensitive string-to-string map. + * @return a write support to write data to this data source. + */ + Optional<BatchWriteSupport> createBatchWriteSupport( + String queryId, + StructType schema, + SaveMode mode, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java deleted file mode 100644 index 7df5a45..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability for continuous stream processing. - */ -@InterfaceStability.Evolving -public interface ContinuousReadSupport extends DataSourceV2 { - /** - * Creates a {@link ContinuousReader} to scan the data from this data source. - * - * @param schema the user provided schema, or empty() if none was provided - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - ContinuousReader createContinuousReader( - Optional<StructType> schema, - String checkpointLocation, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java new file mode 100644 index 0000000..824c290 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for continuous stream processing. + * + * This interface is used to create {@link ContinuousReadSupport} instances when end users run + * {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupportProvider extends DataSourceV2 { + + /** + * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data + * source with a user specified schema, which is called by Spark at the beginning of each + * continuous streaming query. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user specified schema. + * + * @param schema the user provided schema. + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + default ContinuousReadSupport createContinuousReadSupport( + StructType schema, + String checkpointLocation, + DataSourceOptions options) { + return DataSourceV2Utils.failForUserSpecifiedSchema(this); + } + + /** + * Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data + * source, which is called by Spark at the beginning of each continuous streaming query. + * + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + ContinuousReadSupport createContinuousReadSupport( + String checkpointLocation, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index 6234071..6e31e84 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -22,9 +22,13 @@ import org.apache.spark.annotation.InterfaceStability; /** * The base interface for data source v2. Implementations must have a public, 0-arg constructor. * - * Note that this is an empty interface. Data source implementations should mix-in at least one of - * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just - * a dummy data source which is un-readable/writable. + * Note that this is an empty interface. Data source implementations must mix in interfaces such as + * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide + * batch or streaming read/write support instances. Otherwise it's just a dummy data source which + * is un-readable/writable. + * + * If Spark fails to execute any methods in the implementations of this interface (by throwing an + * exception), the read action will fail and no Spark job will be submitted. */ @InterfaceStability.Evolving public interface DataSourceV2 {} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java deleted file mode 100644 index 7f4a2c9..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide streaming micro-batch data reading ability. - */ -@InterfaceStability.Evolving -public interface MicroBatchReadSupport extends DataSourceV2 { - /** - * Creates a {@link MicroBatchReader} to read batches of data from this data source in a - * streaming query. - * - * The execution engine will create a micro-batch reader at the start of a streaming query, - * alternate calls to setOffsetRange and planInputPartitions for each batch to process, and - * then call stop() when the execution is complete. Note that a single query may have multiple - * executions due to restart or failure recovery. - * - * @param schema the user provided schema, or empty() if none was provided - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - MicroBatchReader createMicroBatchReader( - Optional<StructType> schema, - String checkpointLocation, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java new file mode 100644 index 0000000..61c08e7 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for micro-batch stream processing. + * + * This interface is used to create {@link MicroBatchReadSupport} instances when end users run + * {@code SparkSession.readStream.format(...).option(...).load()} with a micro-batch trigger. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupportProvider extends DataSourceV2 { + + /** + * Creates a {@link MicroBatchReadSupport} instance to scan the data from this streaming data + * source with a user specified schema, which is called by Spark at the beginning of each + * micro-batch streaming query. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should + * override this method to handle user specified schema. + * + * @param schema the user provided schema. + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + default MicroBatchReadSupport createMicroBatchReadSupport( + StructType schema, + String checkpointLocation, + DataSourceOptions options) { + return DataSourceV2Utils.failForUserSpecifiedSchema(this); + } + + /** + * Creates a {@link MicroBatchReadSupport} instance to scan the data from this streaming data + * source, which is called by Spark at the beginning of each micro-batch streaming query. + * + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + MicroBatchReadSupport createMicroBatchReadSupport( + String checkpointLocation, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java deleted file mode 100644 index 80ac08e..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability and scan the data from the data source. - */ -@InterfaceStability.Evolving -public interface ReadSupport extends DataSourceV2 { - - /** - * Creates a {@link DataSourceReader} to scan the data from this data source. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - * - * @param schema the user specified schema. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - * - * By default this method throws {@link UnsupportedOperationException}, implementations should - * override this method to handle user specified schema. - */ - default DataSourceReader createReader(StructType schema, DataSourceOptions options) { - String name; - if (this instanceof DataSourceRegister) { - name = ((DataSourceRegister) this).shortName(); - } else { - name = this.getClass().getName(); - } - throw new UnsupportedOperationException(name + " does not support user specified schema"); - } - - /** - * Creates a {@link DataSourceReader} to scan the data from this data source. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - * - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - DataSourceReader createReader(DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java index 9d66805..bbe430e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java @@ -27,10 +27,11 @@ import org.apache.spark.annotation.InterfaceStability; @InterfaceStability.Evolving public interface SessionConfigSupport extends DataSourceV2 { - /** - * Key prefix of the session configs to propagate. Spark will extract all session configs that - * starts with `spark.datasource.$keyPrefix`, turn `spark.datasource.$keyPrefix.xxx -> yyy` - * into `xxx -> yyy`, and propagate them to all data source operations in this session. - */ - String keyPrefix(); + /** + * Key prefix of the session configs to propagate, which is usually the data source name. Spark + * will extract all session configs that starts with `spark.datasource.$keyPrefix`, turn + * `spark.datasource.$keyPrefix.xxx -> yyy` into `xxx -> yyy`, and propagate them to all + * data source operations in this session. + */ + String keyPrefix(); } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java deleted file mode 100644 index a77b014..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamWriteSupport.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSink; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for structured streaming. - */ -@InterfaceStability.Evolving -public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink { - - /** - * Creates an optional {@link StreamWriter} to save the data to this data source. Data - * sources can return None if there is no writing needed to be done. - * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link DataSourceWriter} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive epoch output means to this - * sink, please refer to {@link OutputMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - */ - StreamWriter createStreamWriter( - String queryId, - StructType schema, - OutputMode mode, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java new file mode 100644 index 0000000..f9ca85d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability for structured streaming. + * + * This interface is used to create {@link StreamingWriteSupport} instances when end users run + * {@code Dataset.writeStream.format(...).option(...).start()}. + */ +@InterfaceStability.Evolving +public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink { + + /** + * Creates a {@link StreamingWriteSupport} instance to save the data to this data source, which is + * called by Spark at the beginning of each streaming query. + * + * @param queryId A unique string for the writing query. It's possible that there are many + * writing queries running at the same time, and the returned + * {@link StreamingWriteSupport} can use this id to distinguish itself from others. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive epoch output means to this + * sink, please refer to {@link OutputMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + * case-insensitive string-to-string map. + */ + StreamingWriteSupport createStreamingWriteSupport( + String queryId, + StructType schema, + OutputMode mode, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java deleted file mode 100644 index 048787a..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability and save the data to the data source. - */ -@InterfaceStability.Evolving -public interface WriteSupport extends DataSourceV2 { - - /** - * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data - * sources can return None if there is no writing needed to be done according to the save mode. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - * - * @param writeUUID A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceWriter} can - * use this job id to distinguish itself from other jobs. - * @param schema the schema of the data to be written. - * @param mode the save mode which determines what to do when the data are already in this data - * source, please refer to {@link SaveMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - * @return a writer to append data to this data source - */ - Optional<DataSourceWriter> createWriter( - String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java new file mode 100644 index 0000000..452ee86 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface that defines how to load the data from data source for batch processing. + * + * The execution engine will get an instance of this interface from a data source provider + * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch + * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}. + * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in + * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader + * factory to scan data from the data source with a Spark job. + */ +@InterfaceStability.Evolving +public interface BatchReadSupport extends ReadSupport { + + /** + * Returns a builder of {@link ScanConfig}. Spark will call this method and create a + * {@link ScanConfig} for each data scanning job. + * + * The builder can take some query specific information to do operators pushdown, and keep these + * information in the created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs + * to take {@link ScanConfig} as an input. + */ + ScanConfigBuilder newScanConfigBuilder(); + + /** + * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}. + */ + PartitionReaderFactory createReaderFactory(ScanConfig config); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java deleted file mode 100644 index dcb8771..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; - -/** - * A mix-in interface for {@link InputPartition}. Continuous input partitions can - * implement this interface to provide creating {@link InputPartitionReader} with particular offset. - */ -@InterfaceStability.Evolving -public interface ContinuousInputPartition<T> extends InputPartition<T> { - /** - * Create an input partition reader with particular offset as its startOffset. - * - * @param offset offset want to set as the input partition reader's startOffset. - */ - InputPartitionReader<T> createContinuousReader(PartitionOffset offset); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java deleted file mode 100644 index da98fab..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import java.util.List; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A data source reader that is returned by - * {@link ReadSupport#createReader(DataSourceOptions)} or - * {@link ReadSupport#createReader(StructType, DataSourceOptions)}. - * It can mix in various query optimization interfaces to speed up the data scan. The actual scan - * logic is delegated to {@link InputPartition}s, which are returned by - * {@link #planInputPartitions()}. - * - * There are mainly 3 kinds of query optimizations: - * 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column - * pruning), etc. Names of these interfaces start with `SupportsPushDown`. - * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. - * Names of these interfaces start with `SupportsReporting`. - * 3. Columnar scan if implements {@link SupportsScanColumnarBatch}. - * - * If an exception was throw when applying any of these query optimizations, the action will fail - * and no Spark job will be submitted. - * - * Spark first applies all operator push-down optimizations that this data source supports. Then - * Spark collects information this data source reported for further optimizations. Finally Spark - * issues the scan request and does the actual data reading. - */ -@InterfaceStability.Evolving -public interface DataSourceReader { - - /** - * Returns the actual schema of this data source reader, which may be different from the physical - * schema of the underlying storage, as column pruning or other optimizations may happen. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - */ - StructType readSchema(); - - /** - * Returns a list of {@link InputPartition}s. Each {@link InputPartition} is responsible for - * creating a data reader to output data of one RDD partition. The number of input partitions - * returned here is the same as the number of RDD partitions this scan outputs. - * - * Note that, this may not be a full scan if the data source reader mixes in other optimization - * interfaces like column pruning, filter push-down, etc. These optimizations are applied before - * Spark issues the scan request. - * - * If this method fails (by throwing an exception), the action will fail and no Spark job will be - * submitted. - */ - List<InputPartition<InternalRow>> planInputPartitions(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java index f2038d0..95c30de 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java @@ -22,18 +22,18 @@ import java.io.Serializable; import org.apache.spark.annotation.InterfaceStability; /** - * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is - * responsible for creating the actual data reader of one RDD partition. - * The relationship between {@link InputPartition} and {@link InputPartitionReader} - * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. + * A serializable representation of an input partition returned by + * {@link ReadSupport#planInputPartitions(ScanConfig)}. * - * Note that {@link InputPartition}s will be serialized and sent to executors, then - * {@link InputPartitionReader}s will be created on executors to do the actual reading. So - * {@link InputPartition} must be serializable while {@link InputPartitionReader} doesn't need to - * be. + * Note that {@link InputPartition} will be serialized and sent to executors, then + * {@link PartitionReader} will be created by + * {@link PartitionReaderFactory#createReader(InputPartition)} or + * {@link PartitionReaderFactory#createColumnarReader(InputPartition)} on executors to do + * the actual reading. So {@link InputPartition} must be serializable while {@link PartitionReader} + * doesn't need to be. */ @InterfaceStability.Evolving -public interface InputPartition<T> extends Serializable { +public interface InputPartition extends Serializable { /** * The preferred locations where the input partition reader returned by this partition can run @@ -51,12 +51,4 @@ public interface InputPartition<T> extends Serializable { default String[] preferredLocations() { return new String[0]; } - - /** - * Returns an input partition reader to do the actual reading work. - * - * If this method fails (by throwing an exception), the corresponding Spark task would fail and - * get retried until hitting the maximum retry times. - */ - InputPartitionReader<T> createPartitionReader(); } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java deleted file mode 100644 index f3ff7f5..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An input partition reader returned by {@link InputPartition#createPartitionReader()} and is - * responsible for outputting data for a RDD partition. - * - * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow} - * for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data - * source readers that mix in {@link SupportsScanColumnarBatch}. - */ -@InterfaceStability.Evolving -public interface InputPartitionReader<T> extends Closeable { - - /** - * Proceed to next record, returns false if there is no more records. - * - * If this method fails (by throwing an exception), the corresponding Spark task would fail and - * get retried until hitting the maximum retry times. - * - * @throws IOException if failure happens during disk/network IO like reading files. - */ - boolean next() throws IOException; - - /** - * Return the current record. This method should return same value until `next` is called. - * - * If this method fails (by throwing an exception), the corresponding Spark task would fail and - * get retried until hitting the maximum retry times. - */ - T get(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java new file mode 100644 index 0000000..04ff8d0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReader.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or + * {@link PartitionReaderFactory#createColumnarReader(InputPartition)}. It's responsible for + * outputting data for a RDD partition. + * + * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow} + * for normal data sources, or {@link org.apache.spark.sql.vectorized.ColumnarBatch} for columnar + * data sources(whose {@link PartitionReaderFactory#supportColumnarReads(InputPartition)} + * returns true). + */ +@InterfaceStability.Evolving +public interface PartitionReader<T> extends Closeable { + + /** + * Proceed to next record, returns false if there is no more records. + * + * @throws IOException if failure happens during disk/network IO like reading files. + */ + boolean next() throws IOException; + + /** + * Return the current record. This method should return same value until `next` is called. + */ + T get(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java new file mode 100644 index 0000000..f35de93 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * A factory used to create {@link PartitionReader} instances. + * + * If Spark fails to execute any methods in the implementations of this interface or in the returned + * {@link PartitionReader} (by throwing an exception), corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ +@InterfaceStability.Evolving +public interface PartitionReaderFactory extends Serializable { + + /** + * Returns a row-based partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + */ + PartitionReader<InternalRow> createReader(InputPartition partition); + + /** + * Returns a columnar partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + */ + default PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) { + throw new UnsupportedOperationException("Cannot create columnar reader."); + } + + /** + * Returns true if the given {@link InputPartition} should be read by Spark in a columnar way. + * This means, implementations must also implement {@link #createColumnarReader(InputPartition)} + * for the input partitions that this method returns true. + * + * As of Spark 2.4, Spark can only read all input partition in a columnar way, or none of them. + * Data source can't mix columnar and row-based partitions. This may be relaxed in future + * versions. + */ + default boolean supportColumnarReads(InputPartition partition) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java new file mode 100644 index 0000000..a58ddb2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for all the batch and streaming read supports. Data sources should implement + * concrete read support interfaces like {@link BatchReadSupport}. + * + * If Spark fails to execute any methods in the implementations of this interface (by throwing an + * exception), the read action will fail and no Spark job will be submitted. + */ +@InterfaceStability.Evolving +public interface ReadSupport { + + /** + * Returns the full schema of this data source, which is usually the physical schema of the + * underlying storage. This full schema should not be affected by column pruning or other + * optimizations. + */ + StructType fullSchema(); + + /** + * Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition} + * represents a data split that can be processed by one Spark task. The number of input + * partitions returned here is the same as the number of RDD partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source supports optimization like filter + * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting + * {@link InputPartition input partitions}. + */ + InputPartition[] planInputPartitions(ScanConfig config); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java new file mode 100644 index 0000000..7462ce2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.types.StructType; + +/** + * An interface that carries query specific information for the data scanning job, like operator + * pushdown information and streaming query offsets. This is defined as an empty interface, and data + * sources should define their own {@link ScanConfig} classes. + * + * For APIs that take a {@link ScanConfig} as input, like + * {@link ReadSupport#planInputPartitions(ScanConfig)}, + * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and + * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need to + * cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. + */ +@InterfaceStability.Evolving +public interface ScanConfig { + + /** + * Returns the actual schema of this data source reader, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + StructType readSchema(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java new file mode 100644 index 0000000..4c0eedf --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfigBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for building the {@link ScanConfig}. Implementations can mixin those + * SupportsPushDownXYZ interfaces to do operator pushdown, and keep the operator pushdown result in + * the returned {@link ScanConfig}. + */ +@InterfaceStability.Evolving +public interface ScanConfigBuilder { + ScanConfig build(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java index e8cd7ad..44799c7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java @@ -23,7 +23,7 @@ import org.apache.spark.annotation.InterfaceStability; /** * An interface to represent statistics for a data source, which is returned by - * {@link SupportsReportStatistics#getStatistics()}. + * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}. */ @InterfaceStability.Evolving public interface Statistics { http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java deleted file mode 100644 index 595943c..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; - -import java.util.List; - -/** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to output {@link Row} instead of {@link InternalRow}. - * This is an experimental and unstable interface. - */ -@InterfaceStability.Unstable -public interface SupportsDeprecatedScanRow extends DataSourceReader { - default List<InputPartition<InternalRow>> planInputPartitions() { - throw new IllegalStateException( - "planInputPartitions not supported by default within SupportsDeprecatedScanRow"); - } - - List<InputPartition<Row>> planRowInputPartitions(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java index 4543c14..9d79a18 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownCatalystFilters.java @@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.expressions.Expression; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * A mix-in interface for {@link ScanConfigBuilder}. Data source readers can implement this * interface to push down arbitrary expressions as predicates to the data source. * This is an experimental and unstable interface as {@link Expression} is not public and may get * changed in the future Spark versions. @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression; * process this interface. */ @InterfaceStability.Unstable -public interface SupportsPushDownCatalystFilters extends DataSourceReader { +public interface SupportsPushDownCatalystFilters extends ScanConfigBuilder { /** * Pushes down filters, and returns filters that need to be evaluated after scanning. http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index b6a90a3..5d32a8a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -21,15 +21,15 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.Filter; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to push down filters to the data source and reduce the size of the data to be read. + * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this interface to + * push down filters to the data source and reduce the size of the data to be read. * * Note that, if data source readers implement both this interface and * {@link SupportsPushDownCatalystFilters}, Spark will ignore this interface and only process * {@link SupportsPushDownCatalystFilters}. */ @InterfaceStability.Evolving -public interface SupportsPushDownFilters extends DataSourceReader { +public interface SupportsPushDownFilters extends ScanConfigBuilder { /** * Pushes down filters, and returns filters that need to be evaluated after scanning. http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java index 427b4d0..edb1649 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java @@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this * interface to push down required columns to the data source and only read these columns during * scan to reduce the size of the data to be read. */ @InterfaceStability.Evolving -public interface SupportsPushDownRequiredColumns extends DataSourceReader { +public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder { /** * Applies column pruning w.r.t. the given requiredSchema. @@ -35,8 +35,8 @@ public interface SupportsPushDownRequiredColumns extends DataSourceReader { * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. * - * Note that, data source readers should update {@link DataSourceReader#readSchema()} after - * applying column pruning. + * Note that, {@link ScanConfig#readSchema()} implementation should take care of the column + * pruning applied here. */ void pruneColumns(StructType requiredSchema); } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java index 6b60da7..db62cd4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java @@ -21,17 +21,17 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; /** - * A mix in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to report data partitioning and try to avoid shuffle at Spark side. + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * report data partitioning and try to avoid shuffle at Spark side. * - * Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid - * adding a shuffle even if the reader does not implement this interface. + * Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition}, + * Spark may avoid adding a shuffle even if the reader does not implement this interface. */ @InterfaceStability.Evolving -public interface SupportsReportPartitioning extends DataSourceReader { +public interface SupportsReportPartitioning extends ReadSupport { /** * Returns the output data partitioning that this reader guarantees. */ - Partitioning outputPartitioning(); + Partitioning outputPartitioning(ScanConfig config); } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java index 9263964..1831488 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java @@ -20,18 +20,18 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; /** - * A mix in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to report statistics to Spark. + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * report statistics to Spark. * - * Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader. - * Implementations that return more accurate statistics based on pushed operators will not improve - * query performance until the planner can push operators before getting stats. + * As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the + * data source. Implementations that return more accurate statistics based on pushed operators will + * not improve query performance until the planner can push operators before getting stats. */ @InterfaceStability.Evolving -public interface SupportsReportStatistics extends DataSourceReader { +public interface SupportsReportStatistics extends ReadSupport { /** - * Returns the basic statistics of this data source. + * Returns the estimated statistics of this data source scan. */ - Statistics getStatistics(); + Statistics estimateStatistics(ScanConfig config); } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java deleted file mode 100644 index f4da686..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import java.util.List; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.vectorized.ColumnarBatch; - -/** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to output {@link ColumnarBatch} and make the scan faster. - */ -@InterfaceStability.Evolving -public interface SupportsScanColumnarBatch extends DataSourceReader { - @Override - default List<InputPartition<InternalRow>> planInputPartitions() { - throw new IllegalStateException( - "planInputPartitions not supported by default within SupportsScanColumnarBatch."); - } - - /** - * Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data - * in batches. - */ - List<InputPartition<ColumnarBatch>> planBatchInputPartitions(); - - /** - * Returns true if the concrete data source reader can read data in batch according to the scan - * properties like required columns, pushes filters, etc. It's possible that the implementation - * can only support some certain columns with certain types. Users can overwrite this method and - * {@link #planInputPartitions()} to fallback to normal read path under some conditions. - */ - default boolean enableBatchRead() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java index 38ca5fc..6764d4b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java @@ -18,12 +18,12 @@ package org.apache.spark.sql.sources.v2.reader.partitioning; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.sources.v2.reader.PartitionReader; /** * A concrete implementation of {@link Distribution}. Represents a distribution where records that * share the same values for the {@link #clusteredColumns} will be produced by the same - * {@link InputPartitionReader}. + * {@link PartitionReader}. */ @InterfaceStability.Evolving public class ClusteredDistribution implements Distribution { http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java index 5e32ba6..364a3f5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java @@ -18,14 +18,14 @@ package org.apache.spark.sql.sources.v2.reader.partitioning; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.sources.v2.reader.PartitionReader; /** * An interface to represent data distribution requirement, which specifies how the records should - * be distributed among the data partitions (one {@link InputPartitionReader} outputs data for one + * be distributed among the data partitions (one {@link PartitionReader} outputs data for one * partition). * Note that this interface has nothing to do with the data ordering inside one - * partition(the output records of a single {@link InputPartitionReader}). + * partition(the output records of a single {@link PartitionReader}). * * The instance of this interface is created and provided by Spark, then consumed by * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java index f460f6b..fb0b6f1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java @@ -19,12 +19,13 @@ package org.apache.spark.sql.sources.v2.reader.partitioning; import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning; /** * An interface to represent the output data partitioning for a data source, which is returned by - * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a - * snapshot. Once created, it should be deterministic and always report the same number of + * {@link SupportsReportPartitioning#outputPartitioning(ScanConfig)}. Note that this should work + * like a snapshot. Once created, it should be deterministic and always report the same number of * partitions and the same "satisfy" result for a certain distribution. */ @InterfaceStability.Evolving --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org