Repository: spark Updated Branches: refs/heads/master 7570eab6b -> 7798c9e6e
[SPARK-22824] Restore old offset for binary compatibility ## What changes were proposed in this pull request? Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3. Author: Jose Torres <j...@databricks.com> Closes #20012 from joseph-torres/binary-compat. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7798c9e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7798c9e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7798c9e6 Branch: refs/heads/master Commit: 7798c9e6ef55dbadfc9eb896fa3f366c76dc187b Parents: 7570eab Author: Jose Torres <j...@databricks.com> Authored: Wed Dec 20 10:43:10 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Wed Dec 20 10:43:10 2017 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/kafka010/KafkaSource.scala | 1 - .../spark/sql/kafka010/KafkaSourceOffset.scala | 3 +- .../spark/sql/kafka010/KafkaSourceSuite.scala | 1 - .../spark/sql/sources/v2/reader/Offset.java | 6 +- .../execution/streaming/FileStreamSource.scala | 1 - .../streaming/FileStreamSourceOffset.scala | 2 - .../sql/execution/streaming/LongOffset.scala | 2 - .../streaming/MicroBatchExecution.scala | 1 - .../spark/sql/execution/streaming/Offset.java | 61 +++++++ .../spark/sql/execution/streaming/Offset.scala | 32 ---- .../sql/execution/streaming/OffsetSeq.scala | 1 - .../sql/execution/streaming/OffsetSeqLog.scala | 1 - .../streaming/RateSourceProvider.scala | 3 +- .../execution/streaming/RateStreamOffset.scala | 4 +- .../streaming/RateStreamSourceV2.scala | 162 ----------------- .../execution/streaming/SerializedOffset.scala | 29 +++ .../spark/sql/execution/streaming/Source.scala | 1 - .../execution/streaming/StreamExecution.scala | 1 - .../execution/streaming/StreamProgress.scala | 2 - .../continuous/ContinuousRateStreamSource.scala | 3 +- .../spark/sql/execution/streaming/memory.scala | 1 - .../sql/execution/streaming/memoryV2.scala | 178 ------------------ .../spark/sql/execution/streaming/socket.scala | 1 - .../streaming/sources/RateStreamSourceV2.scala | 165 +++++++++++++++++ .../execution/streaming/sources/memoryV2.scala | 179 +++++++++++++++++++ .../execution/streaming/MemorySinkV2Suite.scala | 1 + .../execution/streaming/RateSourceV2Suite.scala | 1 + .../sql/streaming/FileStreamSourceSuite.scala | 1 - .../spark/sql/streaming/OffsetSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 1 - .../apache/spark/sql/streaming/StreamTest.scala | 1 - .../streaming/StreamingAggregationSuite.scala | 1 - .../streaming/StreamingQueryListenerSuite.scala | 1 - .../sql/streaming/StreamingQuerySuite.scala | 2 - .../test/DataStreamReaderWriterSuite.scala | 1 - .../sql/streaming/util/BlockingSource.scala | 3 +- 36 files changed, 448 insertions(+), 409 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 87f31fc..e9cff04 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSource._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index 6e24423..b5da415 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.SerializedOffset -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 9cac0e5..2034b9b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java index 1ebd353..ce1c489 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java @@ -23,7 +23,7 @@ package org.apache.spark.sql.sources.v2.reader; * restart checkpoints. Sources should provide an Offset implementation which they can use to * reconstruct the stream position where the offset was taken. */ -public abstract class Offset { +public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { /** * A JSON-serialized representation of an Offset that is * used for saving offsets to the offset log. @@ -41,8 +41,8 @@ public abstract class Offset { */ @Override public boolean equals(Object obj) { - if (obj instanceof Offset) { - return this.json().equals(((Offset) obj).json()); + if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) { + return this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json()); } else { return false; } http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a33b785..0debd7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -27,7 +27,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala index 431e5b9..a2b49d94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala @@ -22,8 +22,6 @@ import scala.util.control.Exception._ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization -import org.apache.spark.sql.sources.v2.reader.Offset - /** * Offset for the [[FileStreamSource]]. * http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 7ea3146..5f0b195 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.sources.v2.reader.Offset - /** * A simple offset for sources that produce a single linear stream of data. */ http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index a67dda9..4a3de8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.{Clock, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java new file mode 100644 index 0000000..80aa550 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming; + +/** + * This is an internal, deprecated interface. New source implementations should use the + * org.apache.spark.sql.sources.v2.reader.Offset class, which is the one that will be supported + * in the long term. + * + * This class will be removed in a future release. + */ +public abstract class Offset { + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * Note: We assume that equivalent/equal offsets serialize to + * identical JSON strings. + * + * @return JSON string encoding + */ + public abstract String json(); + + /** + * Equality based on JSON string representation. We leverage the + * JSON representation for normalization between the Offset's + * in memory and on disk representations. + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof Offset) { + return this.json().equals(((Offset) obj).json()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return this.json().hashCode(); + } + + @Override + public String toString() { + return this.json(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala deleted file mode 100644 index 73f0c62..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.sql.sources.v2.reader.Offset - - -/** - * Used when loading a JSON serialized offset from external storage. - * We are currently not responsible for converting JSON serialized - * data into an internal (i.e., object) representation. Sources should - * define a factory method in their source Offset companion objects - * that accepts a [[SerializedOffset]] for doing the conversion. - */ -case class SerializedOffset(override val json: String) extends Offset - - http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index dcc5935..4e0a468 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -23,7 +23,6 @@ import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.internal.SQLConf.{SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS} -import org.apache.spark.sql.sources.v2.reader.Offset /** * An ordered collection of offsets, used to track the progress of processing data from one or more http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index bfdbc65..e3f4abc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.sources.v2.reader.Offset /** * This class is used to log offsets to persistent files in HDFS. http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index 50671a4..4176132 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -30,9 +30,10 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader +import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader, Offset} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, MicroBatchReader} import org.apache.spark.sql.types._ import org.apache.spark.util.{ManualClock, SystemClock} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala index 13679df..726d857 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.streaming import org.json4s.DefaultFormats import org.json4s.jackson.Serialization -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.sources.v2 case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, (Long, Long)]) - extends Offset { + extends v2.reader.Offset { implicit val defaultFormats: DefaultFormats = DefaultFormats override val json = Serialization.write(partitionToValueAndRunTimeMs) } http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala deleted file mode 100644 index 102551c..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamSourceV2.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.util.Optional - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.json4s.DefaultFormats -import org.json4s.jackson.Serialization - -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.sources.v2.DataSourceV2Options -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} -import org.apache.spark.util.SystemClock - -class RateStreamV2Reader(options: DataSourceV2Options) - extends MicroBatchReader { - implicit val defaultFormats: DefaultFormats = DefaultFormats - - val clock = new SystemClock - - private val numPartitions = - options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt - private val rowsPerSecond = - options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong - - // The interval (in milliseconds) between rows in each partition. - // e.g. if there are 4 global rows per second, and 2 partitions, each partition - // should output rows every (1000 * 2 / 4) = 500 ms. - private val msPerPartitionBetweenRows = (1000 * numPartitions) / rowsPerSecond - - override def readSchema(): StructType = { - StructType( - StructField("timestamp", TimestampType, false) :: - StructField("value", LongType, false) :: Nil) - } - - val creationTimeMs = clock.getTimeMillis() - - private var start: RateStreamOffset = _ - private var end: RateStreamOffset = _ - - override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = { - this.start = start.orElse( - RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs)) - .asInstanceOf[RateStreamOffset] - - this.end = end.orElse { - val currentTime = clock.getTimeMillis() - RateStreamOffset( - this.start.partitionToValueAndRunTimeMs.map { - case startOffset @ (part, (currentVal, currentReadTime)) => - // Calculate the number of rows we should advance in this partition (based on the - // current time), and output a corresponding offset. - val readInterval = currentTime - currentReadTime - val numNewRows = readInterval / msPerPartitionBetweenRows - if (numNewRows <= 0) { - startOffset - } else { - (part, - (currentVal + (numNewRows * numPartitions), - currentReadTime + (numNewRows * msPerPartitionBetweenRows))) - } - } - ) - }.asInstanceOf[RateStreamOffset] - } - - override def getStartOffset(): Offset = { - if (start == null) throw new IllegalStateException("start offset not set") - start - } - override def getEndOffset(): Offset = { - if (end == null) throw new IllegalStateException("end offset not set") - end - } - - override def deserializeOffset(json: String): Offset = { - RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json)) - } - - override def createReadTasks(): java.util.List[ReadTask[Row]] = { - val startMap = start.partitionToValueAndRunTimeMs - val endMap = end.partitionToValueAndRunTimeMs - endMap.keys.toSeq.map { part => - val (endVal, _) = endMap(part) - val (startVal, startTimeMs) = startMap(part) - - val packedRows = mutable.ListBuffer[(Long, Long)]() - var outVal = startVal + numPartitions - var outTimeMs = startTimeMs + msPerPartitionBetweenRows - while (outVal <= endVal) { - packedRows.append((outTimeMs, outVal)) - outVal += numPartitions - outTimeMs += msPerPartitionBetweenRows - } - - RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]] - }.toList.asJava - } - - override def commit(end: Offset): Unit = {} - override def stop(): Unit = {} -} - -case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] { - override def createDataReader(): DataReader[Row] = new RateStreamBatchReader(vals) -} - -class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] { - var currentIndex = -1 - - override def next(): Boolean = { - // Return true as long as the new index is in the seq. - currentIndex += 1 - currentIndex < vals.size - } - - override def get(): Row = { - Row( - DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)), - vals(currentIndex)._2) - } - - override def close(): Unit = {} -} - -object RateStreamSourceV2 { - val NUM_PARTITIONS = "numPartitions" - val ROWS_PER_SECOND = "rowsPerSecond" - - private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = { - RateStreamOffset( - Range(0, numPartitions).map { i => - // Note that the starting offset is exclusive, so we have to decrement the starting value - // by the increment that will later be applied. The first row output in each - // partition will have a value equal to the partition index. - (i, - ((i - numPartitions).toLong, - creationTimeMs)) - }.toMap) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala new file mode 100644 index 0000000..129cfed --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SerializedOffset.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +/** + * Used when loading a JSON serialized offset from external storage. + * We are currently not responsible for converting JSON serialized + * data into an internal (i.e., object) representation. Sources should + * define a factory method in their source Offset companion objects + * that accepts a [[SerializedOffset]] for doing the conversion. + */ +case class SerializedOffset(override val json: String) extends Offset + + http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index dbb408f..311942f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7946889..129995d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 770db40..a3f3662 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming import scala.collection.{immutable, GenTraversableOnce} -import org.apache.spark.sql.sources.v2.reader.Offset - /** * A helper class that looks like a Map[Source, Offset]. */ http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 77fc267..4c3a1ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -25,7 +25,8 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset} +import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index db07175..3041d4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala deleted file mode 100644 index 437040c..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import javax.annotation.concurrent.GuardedBy - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} -import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} -import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.StructType - -/** - * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit - * tests and does not provide durability. - */ -class MemorySinkV2 extends DataSourceV2 - with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { - - override def createMicroBatchWriter( - queryId: String, - batchId: Long, - schema: StructType, - mode: OutputMode, - options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { - java.util.Optional.of(new MemoryWriter(this, batchId, mode)) - } - - override def createContinuousWriter( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { - java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) - } - - private case class AddedData(batchId: Long, data: Array[Row]) - - /** An order list of batches that have been written to this [[Sink]]. */ - @GuardedBy("this") - private val batches = new ArrayBuffer[AddedData]() - - /** Returns all rows that are stored in this [[Sink]]. */ - def allData: Seq[Row] = synchronized { - batches.flatMap(_.data) - } - - def latestBatchId: Option[Long] = synchronized { - batches.lastOption.map(_.batchId) - } - - def latestBatchData: Seq[Row] = synchronized { - batches.lastOption.toSeq.flatten(_.data) - } - - def toDebugString: String = synchronized { - batches.map { case AddedData(batchId, data) => - val dataStr = try data.mkString(" ") catch { - case NonFatal(e) => "[Error converting to string]" - } - s"$batchId: $dataStr" - }.mkString("\n") - } - - def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { - val notCommitted = synchronized { - latestBatchId.isEmpty || batchId > latestBatchId.get - } - if (notCommitted) { - logDebug(s"Committing batch $batchId to $this") - outputMode match { - case Append | Update => - val rows = AddedData(batchId, newRows) - synchronized { batches += rows } - - case Complete => - val rows = AddedData(batchId, newRows) - synchronized { - batches.clear() - batches += rows - } - - case _ => - throw new IllegalArgumentException( - s"Output mode $outputMode is not supported by MemorySink") - } - } else { - logDebug(s"Skipping already committed batch: $batchId") - } - } - - def clear(): Unit = synchronized { - batches.clear() - } - - override def toString(): String = "MemorySink" -} - -case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} - -class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode) - extends DataSourceV2Writer with Logging { - - override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) - - def commit(messages: Array[WriterCommitMessage]): Unit = { - val newRows = messages.flatMap { - case message: MemoryWriterCommitMessage => message.data - } - sink.write(batchId, outputMode, newRows) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - // Don't accept any of the new input. - } -} - -class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode) - extends ContinuousWriter { - - override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) - - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { - val newRows = messages.flatMap { - case message: MemoryWriterCommitMessage => message.data - } - sink.write(epochId, outputMode, newRows) - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - // Don't accept any of the new input. - } -} - -case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] { - def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { - new MemoryDataWriter(partitionId, outputMode) - } -} - -class MemoryDataWriter(partition: Int, outputMode: OutputMode) - extends DataWriter[Row] with Logging { - - private val data = mutable.Buffer[Row]() - - override def write(row: Row): Unit = { - data.append(row) - } - - override def commit(): MemoryWriterCommitMessage = { - val msg = MemoryWriterCommitMessage(partition, data.clone()) - data.clear() - msg - } - - override def abort(): Unit = {} -} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 440cae0..0b22cbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala new file mode 100644 index 0000000..45dc7d7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import java.util.Optional + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.RateStreamOffset +import org.apache.spark.sql.sources.v2.DataSourceV2Options +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} +import org.apache.spark.util.SystemClock + +class RateStreamV2Reader(options: DataSourceV2Options) + extends MicroBatchReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val clock = new SystemClock + + private val numPartitions = + options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt + private val rowsPerSecond = + options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong + + // The interval (in milliseconds) between rows in each partition. + // e.g. if there are 4 global rows per second, and 2 partitions, each partition + // should output rows every (1000 * 2 / 4) = 500 ms. + private val msPerPartitionBetweenRows = (1000 * numPartitions) / rowsPerSecond + + override def readSchema(): StructType = { + StructType( + StructField("timestamp", TimestampType, false) :: + StructField("value", LongType, false) :: Nil) + } + + val creationTimeMs = clock.getTimeMillis() + + private var start: RateStreamOffset = _ + private var end: RateStreamOffset = _ + + override def setOffsetRange( + start: Optional[Offset], + end: Optional[Offset]): Unit = { + this.start = start.orElse( + RateStreamSourceV2.createInitialOffset(numPartitions, creationTimeMs)) + .asInstanceOf[RateStreamOffset] + + this.end = end.orElse { + val currentTime = clock.getTimeMillis() + RateStreamOffset( + this.start.partitionToValueAndRunTimeMs.map { + case startOffset @ (part, (currentVal, currentReadTime)) => + // Calculate the number of rows we should advance in this partition (based on the + // current time), and output a corresponding offset. + val readInterval = currentTime - currentReadTime + val numNewRows = readInterval / msPerPartitionBetweenRows + if (numNewRows <= 0) { + startOffset + } else { + (part, + (currentVal + (numNewRows * numPartitions), + currentReadTime + (numNewRows * msPerPartitionBetweenRows))) + } + } + ) + }.asInstanceOf[RateStreamOffset] + } + + override def getStartOffset(): Offset = { + if (start == null) throw new IllegalStateException("start offset not set") + start + } + override def getEndOffset(): Offset = { + if (end == null) throw new IllegalStateException("end offset not set") + end + } + + override def deserializeOffset(json: String): Offset = { + RateStreamOffset(Serialization.read[Map[Int, (Long, Long)]](json)) + } + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { + val startMap = start.partitionToValueAndRunTimeMs + val endMap = end.partitionToValueAndRunTimeMs + endMap.keys.toSeq.map { part => + val (endVal, _) = endMap(part) + val (startVal, startTimeMs) = startMap(part) + + val packedRows = mutable.ListBuffer[(Long, Long)]() + var outVal = startVal + numPartitions + var outTimeMs = startTimeMs + msPerPartitionBetweenRows + while (outVal <= endVal) { + packedRows.append((outTimeMs, outVal)) + outVal += numPartitions + outTimeMs += msPerPartitionBetweenRows + } + + RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]] + }.toList.asJava + } + + override def commit(end: Offset): Unit = {} + override def stop(): Unit = {} +} + +case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] { + override def createDataReader(): DataReader[Row] = new RateStreamBatchReader(vals) +} + +class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] { + var currentIndex = -1 + + override def next(): Boolean = { + // Return true as long as the new index is in the seq. + currentIndex += 1 + currentIndex < vals.size + } + + override def get(): Row = { + Row( + DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(vals(currentIndex)._1)), + vals(currentIndex)._2) + } + + override def close(): Unit = {} +} + +object RateStreamSourceV2 { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" + + private[sql] def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = { + RateStreamOffset( + Range(0, numPartitions).map { i => + // Note that the starting offset is exclusive, so we have to decrement the starting value + // by the increment that will later be applied. The first row output in each + // partition will have a value equal to the partition index. + (i, + ((i - numPartitions).toLong, + creationTimeMs)) + }.toMap) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala new file mode 100644 index 0000000..94c5dd6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit + * tests and does not provide durability. + */ +class MemorySinkV2 extends DataSourceV2 + with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { + + override def createMicroBatchWriter( + queryId: String, + batchId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { + java.util.Optional.of(new MemoryWriter(this, batchId, mode)) + } + + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { + java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) + } + + private case class AddedData(batchId: Long, data: Array[Row]) + + /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") + private val batches = new ArrayBuffer[AddedData]() + + /** Returns all rows that are stored in this [[Sink]]. */ + def allData: Seq[Row] = synchronized { + batches.flatMap(_.data) + } + + def latestBatchId: Option[Long] = synchronized { + batches.lastOption.map(_.batchId) + } + + def latestBatchData: Seq[Row] = synchronized { + batches.lastOption.toSeq.flatten(_.data) + } + + def toDebugString: String = synchronized { + batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { + case NonFatal(e) => "[Error converting to string]" + } + s"$batchId: $dataStr" + }.mkString("\n") + } + + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { + val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get + } + if (notCommitted) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { + case Append | Update => + val rows = AddedData(batchId, newRows) + synchronized { batches += rows } + + case Complete => + val rows = AddedData(batchId, newRows) + synchronized { + batches.clear() + batches += rows + } + + case _ => + throw new IllegalArgumentException( + s"Output mode $outputMode is not supported by MemorySink") + } + } else { + logDebug(s"Skipping already committed batch: $batchId") + } + } + + def clear(): Unit = synchronized { + batches.clear() + } + + override def toString(): String = "MemorySink" +} + +case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} + +class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode) + extends DataSourceV2Writer with Logging { + + override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) + + def commit(messages: Array[WriterCommitMessage]): Unit = { + val newRows = messages.flatMap { + case message: MemoryWriterCommitMessage => message.data + } + sink.write(batchId, outputMode, newRows) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // Don't accept any of the new input. + } +} + +class ContinuousMemoryWriter(val sink: MemorySinkV2, outputMode: OutputMode) + extends ContinuousWriter { + + override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + val newRows = messages.flatMap { + case message: MemoryWriterCommitMessage => message.data + } + sink.write(epochId, outputMode, newRows) + } + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // Don't accept any of the new input. + } +} + +case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] { + def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = { + new MemoryDataWriter(partitionId, outputMode) + } +} + +class MemoryDataWriter(partition: Int, outputMode: OutputMode) + extends DataWriter[Row] with Logging { + + private val data = mutable.Buffer[Row]() + + override def write(row: Row): Unit = { + data.append(row) + } + + override def commit(): MemoryWriterCommitMessage = { + val msg = MemoryWriterCommitMessage(partition, data.clone()) + data.clear() + msg + } + + override def abort(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala index be4b490..00d4f0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.BeforeAndAfter import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.streaming.{OutputMode, StreamTest} class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala index ef801ce..6514c5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader} import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} import org.apache.spark.sql.streaming.StreamTest http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f4fa7fa..39bb572 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index 4297482..f208f9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.{LongOffset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset} trait OffsetSuite extends SparkFunSuite { /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */ http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index fa4b2dd..7554903 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreCon import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index fb88c5d..71a474e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, SystemClock, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 38aa517..97e0651 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode._ import org.apache.spark.sql.streaming.util.{MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index fc9ac2a..9ff02de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.{Encoder, SparkSession} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ad4d3ab..2fa4595 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -33,12 +33,10 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ManualClock - class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging with MockitoSugar { import AwaitTerminationTester._ http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 952908f..aa163d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _} import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/7798c9e6/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala index 9a35f09..19ab2ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.streaming.util import java.util.concurrent.CountDownLatch import org.apache.spark.sql.{SQLContext, _} -import org.apache.spark.sql.execution.streaming.{LongOffset, Sink, Source} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.sources.v2.reader.Offset import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StructField, StructType} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org