[SPARK-22732] Add Structured Streaming APIs to DataSourceV2 ## What changes were proposed in this pull request?
This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary: - DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface. - DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow. - DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false. - Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.) - DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely. Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming. ## How was this patch tested? Toy implementations of the new interfaces with unit tests. Author: Jose Torres <[email protected]> Closes #19925 from joseph-torres/continuous-api. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8c7c1f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8c7c1f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8c7c1f2 Branch: refs/heads/master Commit: f8c7c1f21aa9d1fd38b584ca8c4adf397966e9f7 Parents: 1e44dd0 Author: Jose Torres <[email protected]> Authored: Wed Dec 13 22:31:39 2017 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Wed Dec 13 22:31:39 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/kafka010/KafkaSource.scala | 1 + .../spark/sql/kafka010/KafkaSourceOffset.scala | 3 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 1 + .../sql/sources/v2/ContinuousReadSupport.java | 42 +++++ .../sql/sources/v2/ContinuousWriteSupport.java | 54 ++++++ .../sql/sources/v2/DataSourceV2Options.java | 8 + .../sql/sources/v2/MicroBatchReadSupport.java | 52 ++++++ .../sql/sources/v2/MicroBatchWriteSupport.java | 58 ++++++ .../sources/v2/reader/ContinuousDataReader.java | 36 ++++ .../sql/sources/v2/reader/ContinuousReader.java | 68 +++++++ .../sql/sources/v2/reader/MicroBatchReader.java | 64 +++++++ .../spark/sql/sources/v2/reader/Offset.java | 60 +++++++ .../sql/sources/v2/reader/PartitionOffset.java | 30 ++++ .../sql/sources/v2/writer/ContinuousWriter.java | 41 +++++ .../execution/streaming/BaseStreamingSink.java | 27 +++ .../streaming/BaseStreamingSource.java | 37 ++++ .../execution/streaming/FileStreamSource.scala | 1 + .../streaming/FileStreamSourceOffset.scala | 3 + .../sql/execution/streaming/LongOffset.scala | 2 + .../spark/sql/execution/streaming/Offset.scala | 34 +--- .../sql/execution/streaming/OffsetSeq.scala | 1 + .../sql/execution/streaming/OffsetSeqLog.scala | 1 + .../streaming/RateSourceProvider.scala | 22 ++- .../execution/streaming/RateStreamOffset.scala | 29 +++ .../streaming/RateStreamSourceV2.scala | 162 +++++++++++++++++ .../spark/sql/execution/streaming/Source.scala | 1 + .../execution/streaming/StreamExecution.scala | 1 + .../execution/streaming/StreamProgress.scala | 2 + .../continuous/ContinuousRateStreamSource.scala | 152 ++++++++++++++++ .../spark/sql/execution/streaming/memory.scala | 1 + .../sql/execution/streaming/memoryV2.scala | 178 +++++++++++++++++++ .../spark/sql/execution/streaming/socket.scala | 1 + .../execution/streaming/MemorySinkV2Suite.scala | 82 +++++++++ .../execution/streaming/OffsetSeqLogSuite.scala | 1 + .../execution/streaming/RateSourceSuite.scala | 1 + .../execution/streaming/RateSourceV2Suite.scala | 155 ++++++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 1 + .../spark/sql/streaming/OffsetSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 1 + .../apache/spark/sql/streaming/StreamTest.scala | 1 + .../streaming/StreamingAggregationSuite.scala | 1 + .../streaming/StreamingQueryListenerSuite.scala | 1 + .../sql/streaming/StreamingQuerySuite.scala | 1 + .../test/DataStreamReaderWriterSuite.scala | 1 + .../sql/streaming/util/BlockingSource.scala | 3 +- 45 files changed, 1390 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index e9cff04..87f31fc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index b5da415..6e24423 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} +import org.apache.spark.sql.execution.streaming.SerializedOffset +import org.apache.spark.sql.sources.v2.reader.Offset /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 2034b9b..9cac0e5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java new file mode 100644 index 0000000..ae4f858 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.sql.sources.v2.reader.ContinuousReader; +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data reading ability for continuous stream processing. + */ +public interface ContinuousReadSupport extends DataSourceV2 { + /** + * Creates a {@link ContinuousReader} to scan the data from this data source. + * + * @param schema the user provided schema, or empty() if none was provided + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceV2Options options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java new file mode 100644 index 0000000..362d5f5 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability for continuous stream processing. + */ [email protected] +public interface ContinuousWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link ContinuousWriter} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + * queries running at the same time, and the returned {@link DataSourceV2Writer} + * can use this id to distinguish itself from others. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive epoch output means to this + * sink, please refer to {@link OutputMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + * case-insensitive string-to-string map. + */ + Optional<ContinuousWriter> createContinuousWriter( + String queryId, + StructType schema, + OutputMode mode, + DataSourceV2Options options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java index e98c045..ddc2acc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2Options.java @@ -36,6 +36,10 @@ public class DataSourceV2Options { return key.toLowerCase(Locale.ROOT); } + public static DataSourceV2Options empty() { + return new DataSourceV2Options(new HashMap<>()); + } + public DataSourceV2Options(Map<String, String> originalMap) { keyLowerCasedMap = new HashMap<>(originalMap.size()); for (Map.Entry<String, String> entry : originalMap.entrySet()) { @@ -43,6 +47,10 @@ public class DataSourceV2Options { } } + public Map<String, String> asMap() { + return new HashMap<>(keyLowerCasedMap); + } + /** * Returns the option value to which the specified key is mapped, case-insensitively. */ http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java new file mode 100644 index 0000000..442cad0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.MicroBatchReader; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide streaming micro-batch data reading ability. + */ [email protected] +public interface MicroBatchReadSupport extends DataSourceV2 { + /** + * Creates a {@link MicroBatchReader} to read batches of data from this data source in a + * streaming query. + * + * The execution engine will create a micro-batch reader at the start of a streaming query, + * alternate calls to setOffsetRange and createReadTasks for each batch to process, and then + * call stop() when the execution is complete. Note that a single query may have multiple + * executions due to restart or failure recovery. + * + * @param schema the user provided schema, or empty() if none was provided + * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure + * recovery. Readers for the same logical source in the same query + * will be given the same checkpointLocation. + * @param options the options for the returned data source reader, which is an immutable + * case-insensitive string-to-string map. + */ + MicroBatchReader createMicroBatchReader( + Optional<StructType> schema, + String checkpointLocation, + DataSourceV2Options options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java new file mode 100644 index 0000000..6364077 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ [email protected] +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + * queries running at the same time, and the returned {@link DataSourceV2Writer} + * can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an + * incrementing counter representing a consistent set of data; the same batch may + * be started multiple times in failure recovery scenarios, but it will always + * contain the same records. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive batch output means to this + * sink, please refer to {@link OutputMode} for more details. + * @param options the options for the returned data source writer, which is an immutable + * case-insensitive string-to-string map. + */ + Optional<DataSourceV2Writer> createMicroBatchWriter( + String queryId, + long epochId, + StructType schema, + OutputMode mode, + DataSourceV2Options options); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java new file mode 100644 index 0000000..11b99a9 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.PartitionOffset; + +import java.io.IOException; + +/** + * A variation on {@link DataReader} for use with streaming in continuous processing mode. + */ +public interface ContinuousDataReader<T> extends DataReader<T> { + /** + * Get the offset of the current record, or the start offset if no records have been read. + * + * The execution engine will call this method along with get() to keep track of the current + * offset. When an epoch ends, the offset of the previous record in each partition will be saved + * as a restart checkpoint. + */ + PartitionOffset getOffset(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java new file mode 100644 index 0000000..1baf82c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.PartitionOffset; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to allow reading in a continuous processing mode stream. + * + * Implementations must ensure each read task output is a {@link ContinuousDataReader}. + */ +public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader { + /** + * Merge offsets coming from {@link ContinuousDataReader} instances in each partition to + * a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Set the desired start offset for read tasks created from this reader. The scan will start + * from the first record after the provided offset, or from an implementation-defined inferred + * starting point if no offset is provided. + */ + void setOffset(Optional<Offset> start); + + /** + * Return the specified or inferred start offset for this reader. + * + * @throws IllegalStateException if setOffset has not been called + */ + Offset getStartOffset(); + + /** + * The execution engine will call this method in every epoch to determine if new read tasks need + * to be generated, which may be required if for example the underlying source system has had + * partitions added or removed. + * + * If true, the query will be shut down and restarted with a new reader. + */ + default boolean needsReconfiguration() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java new file mode 100644 index 0000000..438e3f5 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.sql.sources.v2.reader.Offset; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; + +import java.util.Optional; + +/** + * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this + * interface to indicate they allow micro-batch streaming reads. + */ +public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource { + /** + * Set the desired offset range for read tasks created from this reader. Read tasks will + * generate only data within (`start`, `end`]; that is, from the first record after `start` to + * the record with offset `end`. + * + * @param start The initial offset to scan from. If not specified, scan from an + * implementation-specified start point, such as the earliest available record. + * @param end The last offset to include in the scan. If not specified, scan up to an + * implementation-defined endpoint, such as the last available offset + * or the start offset plus a target batch size. + */ + void setOffsetRange(Optional<Offset> start, Optional<Offset> end); + + /** + * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getStartOffset(); + + /** + * Return the specified (if explicitly set through setOffsetRange) or inferred end offset + * for this reader. + * + * @throws IllegalStateException if setOffsetRange has not been called + */ + Offset getEndOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java new file mode 100644 index 0000000..1ebd353 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +/** + * An abstract representation of progress through a [[MicroBatchReader]] or [[ContinuousReader]]. + * During execution, Offsets provided by the data source implementation will be logged and used as + * restart checkpoints. Sources should provide an Offset implementation which they can use to + * reconstruct the stream position where the offset was taken. + */ +public abstract class Offset { + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + public abstract String json(); + + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in memory and on disk representations. + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof Offset) { + return this.json().equals(((Offset) obj).json()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.json().hashCode(); + } + + @Override + public String toString() { + return this.json(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java new file mode 100644 index 0000000..07826b6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +/** + * Used for per-partition offsets in continuous processing. ContinuousReader implementations will + * provide a method to merge these into a global Offset. + * + * These offsets must be serializable. + */ +public interface PartitionOffset extends Serializable { + +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java new file mode 100644 index 0000000..618f47e --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A {@link DataSourceV2Writer} for use with continuous stream processing. + */ [email protected] +public interface ContinuousWriter extends DataSourceV2Writer { + /** + * Commits this writing job for the specified epoch with a list of commit messages. The commit + * messages are collected from successful data writers and are produced by + * {@link DataWriter#commit()}. + * + * If this method fails (by throwing an exception), this writing job is considered to have been + * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. + */ + void commit(long epochId, WriterCommitMessage[] messages); + + default void commit(WriterCommitMessage[] messages) { + throw new UnsupportedOperationException( + "Commit without epoch should not be called with ContinuousWriter"); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java new file mode 100644 index 0000000..ac96c27 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSink.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming; + +/** + * The shared interface between V1 and V2 streaming sinks. + * + * This is a temporary interface for compatibility during migration. It should not be implemented + * directly, and will be removed in future versions. + */ +public interface BaseStreamingSink { +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java new file mode 100644 index 0000000..3a02cbf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BaseStreamingSource.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming; + +import org.apache.spark.sql.sources.v2.reader.Offset; + +/** + * The shared interface between V1 streaming sources and V2 streaming readers. + * + * This is a temporary interface for compatibility during migration. It should not be implemented + * directly, and will be removed in future versions. + */ +public interface BaseStreamingSource { + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); + + /** Stop this source and free any resources it has allocated. */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0debd7d..a33b785 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -27,6 +27,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index 06d0fe6..431e5b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -22,8 +22,11 @@ import scala.util.control.Exception._ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.sql.sources.v2.reader.Offset + /** * Offset for the [[FileStreamSource]]. + * * @param logOffset Position in the [[FileStreamSourceLog]] */ case class FileStreamSourceOffset(logOffset: Long) extends Offset { http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 5f0b195..7ea3146 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.sources.v2.reader.Offset + /** * A simple offset for sources that produce a single linear stream of data. */ http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 4efcee0..73f0c62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -17,38 +17,8 @@ package org.apache.spark.sql.execution.streaming -/** - * An offset is a monotonically increasing metric used to track progress in the computation of a - * stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global - * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no - * new data has arrived. - */ -abstract class Offset { - - /** - * Equality based on JSON string representation. We leverage the - * JSON representation for normalization between the Offset's - * in memory and on disk representations. - */ - override def equals(obj: Any): Boolean = obj match { - case o: Offset => this.json == o.json - case _ => false - } - - override def hashCode(): Int = this.json.hashCode +import org.apache.spark.sql.sources.v2.reader.Offset - override def toString(): String = this.json.toString - - /** - * A JSON-serialized representation of an Offset that is - * used for saving offsets to the offset log. - * Note: We assume that equivalent/equal offsets serialize to - * identical JSON strings. - * - * @return JSON string encoding - */ - def json: String -} /** * Used when loading a JSON serialized offset from external storage. @@ -58,3 +28,5 @@ abstract class Offset { * that accepts a [[SerializedOffset]] for doing the conversion. */ case class SerializedOffset(override val json: String) extends Offset + + http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 4e0a468..dcc5935 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -23,6 +23,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS} +import org.apache.spark.sql.sources.v2.reader.Offset /** * An ordered collection of offsets, used to track the progress of processing data from one or more http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index e3f4abc..bfdbc65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.sources.v2.reader.Offset /** * This class is used to log offsets to persistent files in HDFS. http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index 077a477..50671a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.io._ import java.nio.charset.StandardCharsets +import java.util.Optional import java.util.concurrent.TimeUnit import org.apache.commons.io.IOUtils @@ -28,7 +29,10 @@ import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader, Offset} import org.apache.spark.sql.types._ import org.apache.spark.util.{ManualClock, SystemClock} @@ -46,7 +50,8 @@ import org.apache.spark.util.{ManualClock, SystemClock} * generated rows. The source will try its best to reach `rowsPerSecond`, but the query may * be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed. */ -class RateSourceProvider extends StreamSourceProvider with DataSourceRegister { +class RateSourceProvider extends StreamSourceProvider with DataSourceRegister + with DataSourceV2 with MicroBatchReadSupport with ContinuousReadSupport{ override def sourceSchema( sqlContext: SQLContext, @@ -100,6 +105,21 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister { params.get("useManualClock").map(_.toBoolean).getOrElse(false) // Only for testing ) } + + override def createMicroBatchReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): MicroBatchReader = { + new RateStreamV2Reader(options) + } + + override def createContinuousReader( + schema: Optional[StructType], + checkpointLocation: String, + options: DataSourceV2Options): ContinuousReader = { + new ContinuousRateStreamReader(options) + } + override def shortName(): String = "rate" } http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala new file mode 100644 index 0000000..13679df --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.sources.v2.reader.Offset + +case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, Long)]) + extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToValueAndRunTimeMs) +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala new file mode 100644 index 0000000..102551c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.Optional + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} +import org.apache.spark.util.SystemClock + +class RateStreamV2Reader(options: DataSourceV2Options) + extends MicroBatchReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val clock = new SystemClock + + private val numPartitions = + options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt + private val rowsPerSecond = + options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong + + // The interval (in milliseconds) between rows in each partition. + // e.g. if there are 4 global rows per second, and 2 partitions, each partition + // should output rows every (1000 * 2 / 4) = 500 ms. + private val msPerPartitionBetweenRows = (1000 * numPartitions) / rowsPerSecond + + override def readSchema(): StructType = { + StructType( + StructField("timestamp", TimestampType, false) :: + StructField("value", LongType, false) :: Nil) + } + + val creationTimeMs = clock.getTimeMillis() + + private var start: RateStreamOffset = _ + private var end: RateStreamOffset = _ + + override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = { + this.start = start.orElse( + RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs)) + .asInstanceOf[RateStreamOffset] + + this.end = end.orElse { + val currentTime = clock.getTimeMillis() + RateStreamOffset( + this.start.partitionToValueAndRunTimeMs.map { + case startOffset @ (part, (currentVal, currentReadTime)) => + // Calculate the number of rows we should advance in this partition (based on the + // current time), and output a corresponding offset. + val readInterval = currentTime - currentReadTime + val numNewRows = readInterval / msPerPartitionBetweenRows + if (numNewRows <= 0) { + startOffset + } else { + (part, + (currentVal + (numNewRows * numPartitions), + currentReadTime + (numNewRows * msPerPartitionBetweenRows))) + } + } + ) + }.asInstanceOf[RateStreamOffset] + } + + override def getStartOffset(): Offset = { + if (start == null) throw new IllegalStateException("start offset not set") + start + } + override def getEndOffset(): Offset = { + if (end == null) throw new IllegalStateException("end offset not set") + end + } + + override def deserializeOffset(json: String): Offset = { + RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json)) + } + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { + val startMap = start.partitionToValueAndRunTimeMs + val endMap = end.partitionToValueAndRunTimeMs + endMap.keys.toSeq.map { part => + val (endVal, _) = endMap(part) + val (startVal, startTimeMs) = startMap(part) + + val packedRows = mutable.ListBuffer[(Long, Long)]() + var outVal = startVal + numPartitions + var outTimeMs = startTimeMs + msPerPartitionBetweenRows + while (outVal <= endVal) { + packedRows.append((outTimeMs, outVal)) + outVal += numPartitions + outTimeMs += msPerPartitionBetweenRows + } + + RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]] + }.toList.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} +} + +case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] { + override def createDataReader(): DataReader[Row] = new RateStreamBatchReader(vals) +} + +class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] { + var currentIndex = -1 + + override def next(): Boolean = { + // Return true as long as the new index is in the seq. + currentIndex += 1 + currentIndex < vals.size + } + + override def get(): Row = { + Row( + DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)), + vals(currentIndex)._2) + } + + override def close(): Unit = {} +} + +object RateStreamSourceV2 { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" + + private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = { + RateStreamOffset( + Range(0, numPartitions).map { i => + // Note that the starting offset is exclusive, so we have to decrement the starting value + // by the increment that will later be applied. The first row output in each + // partition will have a value equal to the partition index. + (i, + ((i - numPartitions).toLong, + creationTimeMs)) + }.toMap) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 311942f..dbb408f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 406560c..16063c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index a3f3662..770db40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming import scala.collection.{immutable, GenTraversableOnce} +import org.apache.spark.sql.sources.v2.reader.Offset + /** * A helper class that looks like a Map[Source, Offset]. */ http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala new file mode 100644 index 0000000..77fc267 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +case class ContinuousRateStreamPartitionOffset( + partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val creationTime = System.currentTimeMillis() + + val numPartitions = options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong + val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { + assert(offsets.length == numPartitions) + val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(i, currVal, nextRead) => (i, (currVal, nextRead)) + } + RateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { + RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json)) + } + + override def readSchema(): StructType = RateSourceProvider.SCHEMA + + private var offset: Offset = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { + this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime)) + } + + override def getStartOffset(): Offset = offset + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { + val partitionStartMap = offset match { + case off: RateStreamOffset => off.partitionToValueAndRunTimeMs + case off => + throw new IllegalArgumentException( + s"invalid offset type ${off.getClass()} for ContinuousRateSource") + } + if (partitionStartMap.keySet.size != numPartitions) { + throw new IllegalArgumentException( + s"The previous run contained ${partitionStartMap.keySet.size} partitions, but" + + s" $numPartitions partitions are currently configured. The numPartitions option" + + " cannot be changed.") + } + + Range(0, numPartitions).map { i => + val start = partitionStartMap(i) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask( + start._1, // starting row value + start._2, // starting time in ms + i, + numPartitions, + perPartitionRate) + .asInstanceOf[ReadTask[Row]] + }.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} + +} + +case class RateStreamReadTask( + startValue: Long, + startTimeMs: Long, + partitionIndex: Int, + increment: Long, + rowsPerSecond: Double) + extends ReadTask[Row] { + override def createDataReader(): DataReader[Row] = + new RateStreamDataReader(startValue, startTimeMs, partitionIndex, increment, rowsPerSecond) +} + +class RateStreamDataReader( + startValue: Long, + startTimeMs: Long, + partitionIndex: Int, + increment: Long, + rowsPerSecond: Double) + extends ContinuousDataReader[Row] { + private var nextReadTime: Long = startTimeMs + private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong + + private var currentValue = startValue + private var currentRow: Row = null + + override def next(): Boolean = { + currentValue += increment + nextReadTime += readTimeIncrement + + try { + while (System.currentTimeMillis < nextReadTime) { + Thread.sleep(nextReadTime - System.currentTimeMillis) + } + } catch { + case _: InterruptedException => + // Someone's trying to end the task; just let them. + return false + } + + currentRow = Row( + DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(nextReadTime)), + currentValue) + + true + } + + override def get: Row = currentRow + + override def close(): Unit = {} + + override def getOffset(): PartitionOffset = + ContinuousRateStreamPartitionOffset(partitionIndex, currentValue, nextReadTime) +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 3041d4d..db07175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala new file mode 100644 index 0000000..437040c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit + * tests and does not provide durability. + */ +class MemorySinkV2 extends DataSourceV2 + with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { + + override def createMicroBatchWriter( + queryId: String, + batchId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { + java.util.Optional.of(new MemoryWriter(this, batchId, mode)) + } + + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { + java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) + } + + private case class AddedData(batchId: Long, data: Array[Row]) + + /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") + private val batches = new ArrayBuffer[AddedData]() + + /** Returns all rows that are stored in this [[Sink]]. */ + def allData: Seq[Row] = synchronized { + batches.flatMap(_.data) + } + + def latestBatchId: Option[Long] = synchronized { + batches.lastOption.map(_.batchId) + } + + def latestBatchData: Seq[Row] = synchronized { + batches.lastOption.toSeq.flatten(_.data) + } + + def toDebugString: String = synchronized { + batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { + case NonFatal(e) => "[Error converting to string]" + } + s"$batchId: $dataStr" + }.mkString("\n") + } + + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { + val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get + } + if (notCommitted) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { + case Append | Update => + val rows = AddedData(batchId, newRows) + synchronized { batches += rows } + + case Complete => + val rows = AddedData(batchId, newRows) + synchronized { + batches.clear() + batches += rows + } + + case _ => + throw new IllegalArgumentException( + s"Output mode $outputMode is not supported by MemorySink") + } + } else { + logDebug(s"Skipping already committed batch: $batchId") + } + } + + def clear(): Unit = synchronized { + batches.clear() + } + + override def toString(): String = "MemorySink" +} + +case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} + +class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode) + extends DataSourceV2Writer with Logging { + + override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) + + def commit(messages: Array[WriterCommitMessage]): Unit = { + val newRows = messages.flatMap { + case message: MemoryWriterCommitMessage => message.data + } + sink.write(batchId, outputMode, newRows) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // Don't accept any of the new input. + } +} + +class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode) + extends ContinuousWriter { + + override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + val newRows = messages.flatMap { + case message: MemoryWriterCommitMessage => message.data + } + sink.write(epochId, outputMode, newRows) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // Don't accept any of the new input. + } +} + +case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] { + def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + new MemoryDataWriter(partitionId, outputMode) + } +} + +class MemoryDataWriter(partition: Int, outputMode: OutputMode) + extends DataWriter[Row] with Logging { + + private val data = mutable.Buffer[Row]() + + override def write(row: Row): Unit = { + data.append(row) + } + + override def commit(): MemoryWriterCommitMessage = { + val msg = MemoryWriterCommitMessage(partition, data.clone()) + data.clear() + msg + } + + override def abort(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 0b22cbc..440cae0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala new file mode 100644 index 0000000..be4b490 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Row +import org.apache.spark.sql.streaming.{OutputMode, StreamTest} + +class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { + test("data writer") { + val partition = 1234 + val writer = new MemoryDataWriter(partition, OutputMode.Append()) + writer.write(Row(1)) + writer.write(Row(2)) + writer.write(Row(44)) + val msg = writer.commit() + assert(msg.data.map(_.getInt(0)) == Seq(1, 2, 44)) + assert(msg.partition == partition) + + // Buffer should be cleared, so repeated commits should give empty. + assert(writer.commit().data.isEmpty) + } + + test("continuous writer") { + val sink = new MemorySinkV2 + val writer = new ContinuousMemoryWriter(sink, OutputMode.Append()) + writer.commit(0, + Array( + MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), + MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), + MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) + )) + assert(sink.latestBatchId.contains(0)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) + writer.commit(19, + Array( + MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), + MemoryWriterCommitMessage(0, Seq(Row(33))) + )) + assert(sink.latestBatchId.contains(19)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) + + assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) + } + + test("microbatch writer") { + val sink = new MemorySinkV2 + new MemoryWriter(sink, 0, OutputMode.Append()).commit( + Array( + MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), + MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), + MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) + )) + assert(sink.latestBatchId.contains(0)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) + new MemoryWriter(sink, 19, OutputMode.Append()).commit( + Array( + MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), + MemoryWriterCommitMessage(0, Seq(Row(33))) + )) + assert(sink.latestBatchId.contains(19)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) + + assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index e6cdc06..4868ba4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.test.SharedSQLContext class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala index 03d0f63..ceba27b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.util.ManualClock http://git-wip-us.apache.org/repos/asf/spark/blob/f8c7c1f2/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala new file mode 100644 index 0000000..ef801ce --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.Optional + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} +import org.apache.spark.sql.streaming.StreamTest + +class RateSourceV2Suite extends StreamTest { + test("microbatch in registry") { + DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { + case ds: MicroBatchReadSupport => + val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty()) + assert(reader.isInstanceOf[RateStreamV2Reader]) + case _ => + throw new IllegalStateException("Could not find v2 read support for rate") + } + } + + test("microbatch - numPartitions propagated") { + val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) + reader.setOffsetRange(Optional.empty(), Optional.empty()) + val tasks = reader.createReadTasks() + assert(tasks.size == 11) + } + + test("microbatch - set offset") { + val reader = new RateStreamV2Reader(DataSourceV2Options.empty()) + val startOffset = RateStreamOffset(Map((0, (0, 1000)))) + val endOffset = RateStreamOffset(Map((0, (0, 2000)))) + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) + assert(reader.getStartOffset() == startOffset) + assert(reader.getEndOffset() == endOffset) + } + + test("microbatch - infer offsets") { + val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava)) + reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100) + reader.setOffsetRange(Optional.empty(), Optional.empty()) + reader.getStartOffset() match { + case r: RateStreamOffset => + assert(r.partitionToValueAndRunTimeMs(0)._2 == reader.creationTimeMs) + case _ => throw new IllegalStateException("unexpected offset type") + } + reader.getEndOffset() match { + case r: RateStreamOffset => + // End offset may be a bit beyond 100 ms/9 rows after creation if the wait lasted + // longer than 100ms. It should never be early. + assert(r.partitionToValueAndRunTimeMs(0)._1 >= 9) + assert(r.partitionToValueAndRunTimeMs(0)._2 >= reader.creationTimeMs + 100) + + case _ => throw new IllegalStateException("unexpected offset type") + } + } + + test("microbatch - predetermined batch size") { + val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava)) + val startOffset = RateStreamOffset(Map((0, (0, 1000)))) + val endOffset = RateStreamOffset(Map((0, (20, 2000)))) + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) + val tasks = reader.createReadTasks() + assert(tasks.size == 1) + assert(tasks.get(0).asInstanceOf[RateStreamBatchTask].vals.size == 20) + } + + test("microbatch - data read") { + val reader = new RateStreamV2Reader( + new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) + val startOffset = RateStreamSourceV2.createInitialOffset(11, reader.creationTimeMs) + val endOffset = RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map { + case (part, (currentVal, currentReadTime)) => + (part, (currentVal + 33, currentReadTime + 1000)) + }.toMap) + + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) + val tasks = reader.createReadTasks() + assert(tasks.size == 11) + + val readData = tasks.asScala + .map(_.createDataReader()) + .flatMap { reader => + val buf = scala.collection.mutable.ListBuffer[Row]() + while (reader.next()) buf.append(reader.get()) + buf + } + + assert(readData.map(_.getLong(1)).sorted == Range(0, 33)) + } + + test("continuous in registry") { + DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { + case ds: ContinuousReadSupport => + val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceV2Options.empty()) + assert(reader.isInstanceOf[ContinuousRateStreamReader]) + case _ => + throw new IllegalStateException("Could not find v2 read support for rate") + } + } + + test("continuous data") { + val reader = new ContinuousRateStreamReader( + new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava)) + reader.setOffset(Optional.empty()) + val tasks = reader.createReadTasks() + assert(tasks.size == 2) + + val data = scala.collection.mutable.ListBuffer[Row]() + tasks.asScala.foreach { + case t: RateStreamReadTask => + val startTimeMs = reader.getStartOffset() + .asInstanceOf[RateStreamOffset] + .partitionToValueAndRunTimeMs(t.partitionIndex) + ._2 + val r = t.createDataReader().asInstanceOf[RateStreamDataReader] + for (rowIndex <- 0 to 9) { + r.next() + data.append(r.get()) + assert(r.getOffset() == + ContinuousRateStreamPartitionOffset( + t.partitionIndex, + t.partitionIndex + rowIndex * 2, + startTimeMs + (rowIndex + 1) * 100)) + } + assert(System.currentTimeMillis() >= startTimeMs + 1000) + + case _ => throw new IllegalStateException("Unexpected task type") + } + + assert(data.map(_.getLong(1)).toSeq.sorted == Range(0, 20)) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
