[SPARK-23268][SQL] Reorganize packages in data source V2 ## What changes were proposed in this pull request? 1. create a new package for partitioning/distribution related classes. As Spark will add new concrete implementations of `Distribution` in new releases, it is good to have a new package for partitioning/distribution related classes.
2. move streaming related class to package `org.apache.spark.sql.sources.v2.reader/writer.streaming`, instead of `org.apache.spark.sql.sources.v2.streaming.reader/writer`. So that the there won't be package reader/writer inside package streaming, which is quite confusing. Before change: ``` v2 âââ reader âââ streaming â  âââ reader â  âââ writer âââ writer ``` After change: ``` v2 âââ reader â  âââ streaming âââ writer âââ streaming ``` ## How was this patch tested? Unit test. Author: Wang Gengliang <ltn...@gmail.com> Closes #20435 from gengliangwang/new_pkg. (cherry picked from commit 56ae32657e9e5d1e30b62afe77d9e14eb07cf4fb) Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59e89a29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59e89a29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59e89a29 Branch: refs/heads/branch-2.3 Commit: 59e89a2990e4f66839c91f48f41157dac6e670ad Parents: 0d0f579 Author: Wang Gengliang <ltn...@gmail.com> Authored: Wed Jan 31 20:33:51 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Jan 31 20:34:01 2018 -0800 ---------------------------------------------------------------------- .../sql/kafka010/KafkaContinuousReader.scala | 2 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 2 +- .../sql/kafka010/KafkaSourceProvider.scala | 5 +- .../spark/sql/kafka010/KafkaStreamWriter.scala | 2 +- .../v2/reader/ClusteredDistribution.java | 38 ---------- .../v2/reader/ContinuousReadSupport.java | 48 ++++++++++++ .../sql/sources/v2/reader/Distribution.java | 39 ---------- .../v2/reader/MicroBatchReadSupport.java | 54 +++++++++++++ .../sql/sources/v2/reader/Partitioning.java | 46 ------------ .../v2/reader/SupportsReportPartitioning.java | 1 + .../partitioning/ClusteredDistribution.java | 39 ++++++++++ .../v2/reader/partitioning/Distribution.java | 40 ++++++++++ .../v2/reader/partitioning/Partitioning.java | 48 ++++++++++++ .../reader/streaming/ContinuousDataReader.java | 36 +++++++++ .../v2/reader/streaming/ContinuousReader.java | 79 ++++++++++++++++++++ .../v2/reader/streaming/MicroBatchReader.java | 75 +++++++++++++++++++ .../sql/sources/v2/reader/streaming/Offset.java | 69 +++++++++++++++++ .../v2/reader/streaming/PartitionOffset.java | 32 ++++++++ .../v2/streaming/ContinuousReadSupport.java | 48 ------------ .../v2/streaming/MicroBatchReadSupport.java | 54 ------------- .../v2/streaming/StreamWriteSupport.java | 54 ------------- .../streaming/reader/ContinuousDataReader.java | 36 --------- .../v2/streaming/reader/ContinuousReader.java | 79 -------------------- .../v2/streaming/reader/MicroBatchReader.java | 75 ------------------- .../sql/sources/v2/streaming/reader/Offset.java | 69 ----------------- .../v2/streaming/reader/PartitionOffset.java | 32 -------- .../v2/streaming/writer/StreamWriter.java | 72 ------------------ .../sql/sources/v2/writer/DataSourceWriter.java | 2 +- .../sources/v2/writer/StreamWriteSupport.java | 53 +++++++++++++ .../v2/writer/streaming/StreamWriter.java | 72 ++++++++++++++++++ .../datasources/v2/DataSourcePartitioning.scala | 2 +- .../datasources/v2/DataSourceV2ScanExec.scala | 2 +- .../datasources/v2/WriteToDataSourceV2.scala | 2 +- .../streaming/MicroBatchExecution.scala | 6 +- .../streaming/RateSourceProvider.scala | 5 +- .../execution/streaming/RateStreamOffset.scala | 2 +- .../execution/streaming/StreamingRelation.scala | 2 +- .../spark/sql/execution/streaming/console.scala | 4 +- .../ContinuousDataSourceRDDIter.scala | 10 +-- .../continuous/ContinuousExecution.scala | 5 +- .../continuous/ContinuousRateStreamSource.scala | 2 +- .../streaming/continuous/EpochCoordinator.scala | 4 +- .../streaming/sources/ConsoleWriter.scala | 2 +- .../streaming/sources/MicroBatchWriter.scala | 2 +- .../streaming/sources/RateStreamSourceV2.scala | 3 +- .../execution/streaming/sources/memoryV2.scala | 3 +- .../spark/sql/streaming/DataStreamReader.scala | 2 +- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../sql/streaming/StreamingQueryManager.scala | 2 +- .../v2/JavaPartitionAwareDataSource.java | 3 + .../execution/streaming/RateSourceV2Suite.scala | 2 +- .../sql/sources/v2/DataSourceV2Suite.scala | 1 + .../sources/StreamingDataSourceV2Suite.scala | 8 +- 53 files changed, 690 insertions(+), 687 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 8c73342..41c443b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRo import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index c82154c..8d41c0d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 85e96b6..694ca76 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -31,8 +31,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport} -import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter +import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport +import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index a24efde..9307bfc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery -import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java deleted file mode 100644 index 27905e3..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * A concrete implementation of {@link Distribution}. Represents a distribution where records that - * share the same values for the {@link #clusteredColumns} will be produced by the same - * {@link DataReader}. - */ -@InterfaceStability.Evolving -public class ClusteredDistribution implements Distribution { - - /** - * The names of the clustered columns. Note that they are order insensitive. - */ - public final String[] clusteredColumns; - - public ClusteredDistribution(String[] clusteredColumns) { - this.clusteredColumns = clusteredColumns; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java new file mode 100644 index 0000000..0c1d5d1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for continuous stream processing. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends DataSourceV2 { + /** + * Creates a {@link ContinuousReader} to scan the data from this data source. + * + * @param schema the user provided schema, or empty() if none was provided + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + ContinuousReader createContinuousReader( + Optional<StructType> schema, + String checkpointLocation, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java deleted file mode 100644 index b375621..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An interface to represent data distribution requirement, which specifies how the records should - * be distributed among the data partitions(one {@link DataReader} outputs data for one partition). - * Note that this interface has nothing to do with the data ordering inside one - * partition(the output records of a single {@link DataReader}). - * - * The instance of this interface is created and provided by Spark, then consumed by - * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to - * implement this interface, but need to catch as more concrete implementations of this interface - * as possible in {@link Partitioning#satisfy(Distribution)}. - * - * Concrete implementations until now: - * <ul> - * <li>{@link ClusteredDistribution}</li> - * </ul> - */ -@InterfaceStability.Evolving -public interface Distribution {} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java new file mode 100644 index 0000000..5e8f0c0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide streaming micro-batch data reading ability. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends DataSourceV2 { + /** + * Creates a {@link MicroBatchReader} to read batches of data from this data source in a + * streaming query. + * + * The execution engine will create a micro-batch reader at the start of a streaming query, + * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and + * then call stop() when the execution is complete. Note that a single query may have multiple + * executions due to restart or failure recovery. + * + * @param schema the user provided schema, or empty() if none was provided + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + MicroBatchReader createMicroBatchReader( + Optional<StructType> schema, + String checkpointLocation, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java deleted file mode 100644 index 5e334d1..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.reader; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An interface to represent the output data partitioning for a data source, which is returned by - * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a - * snapshot. Once created, it should be deterministic and always report the same number of - * partitions and the same "satisfy" result for a certain distribution. - */ -@InterfaceStability.Evolving -public interface Partitioning { - - /** - * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs. - */ - int numPartitions(); - - /** - * Returns true if this partitioning can satisfy the given distribution, which means Spark does - * not need to shuffle the output data of this data source for some certain operations. - * - * Note that, Spark may add new concrete implementations of {@link Distribution} in new releases. - * This method should be aware of it and always return false for unrecognized distributions. It's - * recommended to check every Spark new release and support new distributions if possible, to - * avoid shuffle at Spark side for more cases. - */ - boolean satisfy(Distribution distribution); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java index a2383a9..5405a91 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; /** * A mix in interface for {@link DataSourceReader}. Data source readers can implement this http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java new file mode 100644 index 0000000..2d0ee50 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.partitioning; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataReader; + +/** + * A concrete implementation of {@link Distribution}. Represents a distribution where records that + * share the same values for the {@link #clusteredColumns} will be produced by the same + * {@link DataReader}. + */ +@InterfaceStability.Evolving +public class ClusteredDistribution implements Distribution { + + /** + * The names of the clustered columns. Note that they are order insensitive. + */ + public final String[] clusteredColumns; + + public ClusteredDistribution(String[] clusteredColumns) { + this.clusteredColumns = clusteredColumns; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java new file mode 100644 index 0000000..f6b111f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.partitioning; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataReader; + +/** + * An interface to represent data distribution requirement, which specifies how the records should + * be distributed among the data partitions(one {@link DataReader} outputs data for one partition). + * Note that this interface has nothing to do with the data ordering inside one + * partition(the output records of a single {@link DataReader}). + * + * The instance of this interface is created and provided by Spark, then consumed by + * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to + * implement this interface, but need to catch as more concrete implementations of this interface + * as possible in {@link Partitioning#satisfy(Distribution)}. + * + * Concrete implementations until now: + * <ul> + * <li>{@link ClusteredDistribution}</li> + * </ul> + */ +@InterfaceStability.Evolving +public interface Distribution {} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java new file mode 100644 index 0000000..309d9e5 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.partitioning; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataReaderFactory; +import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning; + +/** + * An interface to represent the output data partitioning for a data source, which is returned by + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a + * snapshot. Once created, it should be deterministic and always report the same number of + * partitions and the same "satisfy" result for a certain distribution. + */ +@InterfaceStability.Evolving +public interface Partitioning { + + /** + * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs. + */ + int numPartitions(); + + /** + * Returns true if this partitioning can satisfy the given distribution, which means Spark does + * not need to shuffle the output data of this data source for some certain operations. + * + * Note that, Spark may add new concrete implementations of {@link Distribution} in new releases. + * This method should be aware of it and always return false for unrecognized distributions. It's + * recommended to check every Spark new release and support new distributions if possible, to + * avoid shuffle at Spark side for more cases. + */ + boolean satisfy(Distribution distribution); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java new file mode 100644 index 0000000..47d2644 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataReader; + +/** + * A variation on {@link DataReader} for use with streaming in continuous processing mode. + */ +@InterfaceStability.Evolving +public interface ContinuousDataReader<T> extends DataReader<T> { + /** + * Get the offset of the current record, or the start offset if no records have been read. + * + * The execution engine will call this method along with get() to keep track of the current + * offset. When an epoch ends, the offset of the previous record in each partition will be saved + * as a restart checkpoint. + */ + PartitionOffset getOffset(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java new file mode 100644 index 0000000..d1d1e7f --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to allow reading in a continuous processing mode stream. + * + * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}. + * + * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with + * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. + */ +@InterfaceStability.Evolving +public interface ContinuousReader extends BaseStreamingSource, DataSourceReader { + /** + * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each + * partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Set the desired start offset for reader factories created from this reader. The scan will + * start from the first record after the provided offset, or from an implementation-defined + * inferred starting point if no offset is provided. + */ + void setOffset(Optional<Offset> start); + + /** + * Return the specified or inferred start offset for this reader. + * + * @throws IllegalStateException if setOffset has not been called + */ + Offset getStartOffset(); + + /** + * The execution engine will call this method in every epoch to determine if new reader + * factories need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new reader. + */ + default boolean needsReconfiguration() { + return false; + } + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java new file mode 100644 index 0000000..67ebde3 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to indicate they allow micro-batch streaming reads. + * + * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with + * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. + */ +@InterfaceStability.Evolving +public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource { + /** + * Set the desired offset range for reader factories created from this reader. Reader factories + * will generate only data within (`start`, `end`]; that is, from the first record after `start` + * to the record with offset `end`. + * + * @param start The initial offset to scan from. If not specified, scan from an + * implementation-specified start point, such as the earliest available record. + * @param end The last offset to include in the scan. If not specified, scan up to an + * implementation-defined endpoint, such as the last available offset + * or the start offset plus a target batch size. + */ + void setOffsetRange(Optional<Offset> start, Optional<Offset> end); + + /** + * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getStartOffset(); + + /** + * Return the specified (if explicitly set through setOffsetRange) or inferred end offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getEndOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java new file mode 100644 index 0000000..e41c035 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An abstract representation of progress through a {@link MicroBatchReader} or + * {@link ContinuousReader}. + * During execution, offsets provided by the data source implementation will be logged and used as + * restart checkpoints. Each source should provide an offset implementation which the source can use + * to reconstruct a position in the stream up to which data has been seen/processed. + * + * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to + * maintain compatibility with DataSource V1 APIs. This extension will be removed once we + * get rid of V1 completely. + */ +@InterfaceStability.Evolving +public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + public abstract String json(); + + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in deserialized and serialized representations. + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) { + return this.json() + .equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.json().hashCode(); + } + + @Override + public String toString() { + return this.json(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java new file mode 100644 index 0000000..383e73d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * Used for per-partition offsets in continuous processing. ContinuousReader implementations will + * provide a method to merge these into a global Offset. + * + * These offsets must be serializable. + */ +@InterfaceStability.Evolving +public interface PartitionOffset extends Serializable { +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java deleted file mode 100644 index f79424e..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data reading ability for continuous stream processing. - */ -@InterfaceStability.Evolving -public interface ContinuousReadSupport extends DataSourceV2 { - /** - * Creates a {@link ContinuousReader} to scan the data from this data source. - * - * @param schema the user provided schema, or empty() if none was provided - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - ContinuousReader createContinuousReader( - Optional<StructType> schema, - String checkpointLocation, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java deleted file mode 100644 index 22660e4..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming; - -import java.util.Optional; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide streaming micro-batch data reading ability. - */ -@InterfaceStability.Evolving -public interface MicroBatchReadSupport extends DataSourceV2 { - /** - * Creates a {@link MicroBatchReader} to read batches of data from this data source in a - * streaming query. - * - * The execution engine will create a micro-batch reader at the start of a streaming query, - * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and - * then call stop() when the execution is complete. Note that a single query may have multiple - * executions due to restart or failure recovery. - * - * @param schema the user provided schema, or empty() if none was provided - * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure - * recovery. Readers for the same logical source in the same query - * will be given the same checkpointLocation. - * @param options the options for the returned data source reader, which is an immutable - * case-insensitive string-to-string map. - */ - MicroBatchReader createMicroBatchReader( - Optional<StructType> schema, - String checkpointLocation, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java deleted file mode 100644 index 7c5f304..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSink; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for structured streaming. - */ -@InterfaceStability.Evolving -public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink { - - /** - * Creates an optional {@link StreamWriter} to save the data to this data source. Data - * sources can return None if there is no writing needed to be done. - * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link DataSourceWriter} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the output mode which determines what successive epoch output means to this - * sink, please refer to {@link OutputMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - */ - StreamWriter createStreamWriter( - String queryId, - StructType schema, - OutputMode mode, - DataSourceOptions options); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java deleted file mode 100644 index 3f13a4d..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataReader; - -/** - * A variation on {@link DataReader} for use with streaming in continuous processing mode. - */ -@InterfaceStability.Evolving -public interface ContinuousDataReader<T> extends DataReader<T> { - /** - * Get the offset of the current record, or the start offset if no records have been read. - * - * The execution engine will call this method along with get() to keep track of the current - * offset. When an epoch ends, the offset of the previous record in each partition will be saved - * as a restart checkpoint. - */ - PartitionOffset getOffset(); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java deleted file mode 100644 index 6e5177e..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; - -import java.util.Optional; - -/** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to allow reading in a continuous processing mode stream. - * - * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}. - * - * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with - * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. - */ -@InterfaceStability.Evolving -public interface ContinuousReader extends BaseStreamingSource, DataSourceReader { - /** - * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each - * partition to a single global offset. - */ - Offset mergeOffsets(PartitionOffset[] offsets); - - /** - * Deserialize a JSON string into an Offset of the implementation-defined offset type. - * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader - */ - Offset deserializeOffset(String json); - - /** - * Set the desired start offset for reader factories created from this reader. The scan will - * start from the first record after the provided offset, or from an implementation-defined - * inferred starting point if no offset is provided. - */ - void setOffset(Optional<Offset> start); - - /** - * Return the specified or inferred start offset for this reader. - * - * @throws IllegalStateException if setOffset has not been called - */ - Offset getStartOffset(); - - /** - * The execution engine will call this method in every epoch to determine if new reader - * factories need to be generated, which may be required if for example the underlying - * source system has had partitions added or removed. - * - * If true, the query will be shut down and restarted with a new reader. - */ - default boolean needsReconfiguration() { - return false; - } - - /** - * Informs the source that Spark has completed processing all data for offsets less than or - * equal to `end` and will only request offsets greater than `end` in the future. - */ - void commit(Offset end); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java deleted file mode 100644 index fcec446..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.reader; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.execution.streaming.BaseStreamingSource; - -import java.util.Optional; - -/** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to indicate they allow micro-batch streaming reads. - * - * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with - * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely. - */ -@InterfaceStability.Evolving -public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource { - /** - * Set the desired offset range for reader factories created from this reader. Reader factories - * will generate only data within (`start`, `end`]; that is, from the first record after `start` - * to the record with offset `end`. - * - * @param start The initial offset to scan from. If not specified, scan from an - * implementation-specified start point, such as the earliest available record. - * @param end The last offset to include in the scan. If not specified, scan up to an - * implementation-defined endpoint, such as the last available offset - * or the start offset plus a target batch size. - */ - void setOffsetRange(Optional<Offset> start, Optional<Offset> end); - - /** - * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset - * for this reader. - * - * @throws IllegalStateException if setOffsetRange has not been called - */ - Offset getStartOffset(); - - /** - * Return the specified (if explicitly set through setOffsetRange) or inferred end offset - * for this reader. - * - * @throws IllegalStateException if setOffsetRange has not been called - */ - Offset getEndOffset(); - - /** - * Deserialize a JSON string into an Offset of the implementation-defined offset type. - * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader - */ - Offset deserializeOffset(String json); - - /** - * Informs the source that Spark has completed processing all data for offsets less than or - * equal to `end` and will only request offsets greater than `end` in the future. - */ - void commit(Offset end); -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java deleted file mode 100644 index abba3e7..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.reader; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * An abstract representation of progress through a {@link MicroBatchReader} or - * {@link ContinuousReader}. - * During execution, offsets provided by the data source implementation will be logged and used as - * restart checkpoints. Each source should provide an offset implementation which the source can use - * to reconstruct a position in the stream up to which data has been seen/processed. - * - * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to - * maintain compatibility with DataSource V1 APIs. This extension will be removed once we - * get rid of V1 completely. - */ -@InterfaceStability.Evolving -public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { - /** - * A JSON-serialized representation of an Offset that is - * used for saving offsets to the offset log. - * Note: We assume that equivalent/equal offsets serialize to - * identical JSON strings. - * - * @return JSON string encoding - */ - public abstract String json(); - - /** - * Equality based on JSON string representation. We leverage the - * JSON representation for normalization between the Offset's - * in deserialized and serialized representations. - */ - @Override - public boolean equals(Object obj) { - if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) { - return this.json() - .equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return this.json().hashCode(); - } - - @Override - public String toString() { - return this.json(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java deleted file mode 100644 index 4688b85..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.reader; - -import java.io.Serializable; - -import org.apache.spark.annotation.InterfaceStability; - -/** - * Used for per-partition offsets in continuous processing. ContinuousReader implementations will - * provide a method to merge these into a global Offset. - * - * These offsets must be serializable. - */ -@InterfaceStability.Evolving -public interface PartitionOffset extends Serializable { -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java deleted file mode 100644 index 915ee6c..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2.streaming.writer; - -import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; - -/** - * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and - * aborts relative to an epoch ID determined by the execution engine. - * - * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs, - * and so must reset any internal state after a successful commit. - */ -@InterfaceStability.Evolving -public interface StreamWriter extends DataSourceWriter { - /** - * Commits this writing job for the specified epoch with a list of commit messages. The commit - * messages are collected from successful data writers and are produced by - * {@link DataWriter#commit()}. - * - * If this method fails (by throwing an exception), this writing job is considered to have been - * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. - * - * To support exactly-once processing, writer implementations should ensure that this method is - * idempotent. The execution engine may call commit() multiple times for the same epoch - * in some circumstances. - */ - void commit(long epochId, WriterCommitMessage[] messages); - - /** - * Aborts this writing job because some data writers are failed and keep failing when retry, or - * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. - * - * If this method fails (by throwing an exception), the underlying data source may require manual - * cleanup. - * - * Unless the abort is triggered by the failure of commit, the given messages should have some - * null slots as there maybe only a few data writers that are committed before the abort - * happens, or some data writers were committed but their commit messages haven't reached the - * driver when the abort is triggered. So this is just a "best effort" for data sources to - * clean up the data left by data writers. - */ - void abort(long epochId, WriterCommitMessage[] messages); - - default void commit(WriterCommitMessage[] messages) { - throw new UnsupportedOperationException( - "Commit without epoch should not be called with StreamWriter"); - } - - default void abort(WriterCommitMessage[] messages) { - throw new UnsupportedOperationException( - "Abort without epoch should not be called with StreamWriter"); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java index d89d27d..7096aec 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType; /** * A data source writer that is returned by * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/ - * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter( + * {@link StreamWriteSupport#createStreamWriter( * String, StructType, OutputMode, DataSourceOptions)}. * It can mix in various writing optimization interfaces to speed up the data saving. The actual * writing logic is delegated to {@link DataWriter}. http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java new file mode 100644 index 0000000..1c0e2e1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability for structured streaming. + */ +@InterfaceStability.Evolving +public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink { + + /** + * Creates an optional {@link StreamWriter} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many + * writing queries running at the same time, and the returned + * {@link DataSourceWriter} can use this id to distinguish itself from others. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive epoch output means to this + * sink, please refer to {@link OutputMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + * case-insensitive string-to-string map. + */ + StreamWriter createStreamWriter( + String queryId, + StructType schema, + OutputMode mode, + DataSourceOptions options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java new file mode 100644 index 0000000..4913341 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; +import org.apache.spark.sql.sources.v2.writer.DataWriter; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +/** + * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and + * aborts relative to an epoch ID determined by the execution engine. + * + * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs, + * and so must reset any internal state after a successful commit. + */ +@InterfaceStability.Evolving +public interface StreamWriter extends DataSourceWriter { + /** + * Commits this writing job for the specified epoch with a list of commit messages. The commit + * messages are collected from successful data writers and are produced by + * {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to have been + * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + * + * To support exactly-once processing, writer implementations should ensure that this method is + * idempotent. The execution engine may call commit() multiple times for the same epoch + * in some circumstances. + */ + void commit(long epochId, WriterCommitMessage[] messages); + + /** + * Aborts this writing job because some data writers are failed and keep failing when retry, or + * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails. + * + * If this method fails (by throwing an exception), the underlying data source may require manual + * cleanup. + * + * Unless the abort is triggered by the failure of commit, the given messages should have some + * null slots as there maybe only a few data writers that are committed before the abort + * happens, or some data writers were committed but their commit messages haven't reached the + * driver when the abort is triggered. So this is just a "best effort" for data sources to + * clean up the data left by data writers. + */ + void abort(long epochId, WriterCommitMessage[] messages); + + default void commit(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException( + "Commit without epoch should not be called with StreamWriter"); + } + + default void abort(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException( + "Abort without epoch should not be called with StreamWriter"); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala index 943d010..017a673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression} import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning} +import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Partitioning} /** * An adapter from public data source partitioning to catalyst internal `Partitioning`. http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index ee08582..df469af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index c544adb..6592bd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions} -import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/59e89a29/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 93572f7..d9aa857 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter} import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport} -import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2} -import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow +import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.{Clock, Utils} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org