Repository: spark Updated Branches: refs/heads/master afd9bc1d8 -> 83a6ace0d
[SPARK-18234][SS] Made update mode public ## What changes were proposed in this pull request? Made update mode public. As part of that here are the changes. - Update DatastreamWriter to accept "update" - Changed package of InternalOutputModes from o.a.s.sql to o.a.s.sql.catalyst - Added update mode state removing with watermark to StateStoreSaveExec ## How was this patch tested? Added new tests in changed modules Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #16360 from tdas/SPARK-18234. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83a6ace0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83a6ace0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83a6ace0 Branch: refs/heads/master Commit: 83a6ace0d1be44f70e768348ae6688798c84343e Parents: afd9bc1 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Wed Dec 21 16:43:17 2016 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Dec 21 16:43:17 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/streaming/OutputMode.java | 12 +- .../apache/spark/sql/InternalOutputModes.scala | 47 --- .../analysis/UnsupportedOperationChecker.scala | 3 +- .../streaming/InternalOutputModes.scala | 47 +++ .../analysis/UnsupportedOperationsSuite.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/streaming/StatefulAggregate.scala | 61 ++-- .../spark/sql/execution/streaming/memory.scala | 5 +- .../spark/sql/streaming/DataStreamWriter.scala | 17 +- .../execution/streaming/MemorySinkSuite.scala | 287 +++++++++++++++++++ .../sql/streaming/EventTimeWatermarkSuite.scala | 55 +++- .../sql/streaming/FileStreamSinkSuite.scala | 22 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../spark/sql/streaming/MemorySinkSuite.scala | 274 ------------------ .../spark/sql/streaming/StreamSuite.scala | 8 +- .../streaming/StreamingAggregationSuite.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 38 ++- 17 files changed, 507 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index a515c1a..cf0579f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming; import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.InternalOutputModes; +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes; /** * :: Experimental :: @@ -54,4 +54,14 @@ public class OutputMode { public static OutputMode Complete() { return InternalOutputModes.Complete$.MODULE$; } + + /** + * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will + * be written to the sink every time there are some updates. + * + * @since 2.1.1 + */ + public static OutputMode Update() { + return InternalOutputModes.Update$.MODULE$; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala deleted file mode 100644 index 594c41c..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.sql.streaming.OutputMode - -/** - * Internal helper class to generate objects representing various `OutputMode`s, - */ -private[sql] object InternalOutputModes { - - /** - * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be - * written to the sink. This output mode can be only be used in queries that do not - * contain any aggregation. - */ - case object Append extends OutputMode - - /** - * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written - * to the sink every time these is some updates. This output mode can only be used in queries - * that contain aggregations. - */ - case object Complete extends OutputMode - - /** - * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be - * written to the sink every time these is some updates. This output mode can only be used in - * queries that contain aggregations. - */ - case object Update extends OutputMode -} http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c4a78f9..60d9881 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.{AnalysisException, InternalOutputModes} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.streaming.OutputMode /** http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala new file mode 100644 index 0000000..915f4a9 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import org.apache.spark.sql.streaming.OutputMode + +/** + * Internal helper class to generate objects representing various `OutputMode`s, + */ +private[sql] object InternalOutputModes { + + /** + * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be + * written to the sink. This output mode can be only be used in queries that do not + * contain any aggregation. + */ + case object Append extends OutputMode + + /** + * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates. This output mode can only be used in queries + * that contain aggregations. + */ + case object Complete extends OutputMode + + /** + * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be + * written to the sink every time these is some updates. This output mode can only be used in + * queries that contain aggregations. + */ + case object Update extends OutputMode +} http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 34e94c7..94a008f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -27,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.IntegerType http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5245c14..ac3f068 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -278,7 +278,7 @@ case class DataSource( throw new IllegalArgumentException("'path' is not specified") }) if (outputMode != OutputMode.Append) { - throw new IllegalArgumentException( + throw new AnalysisException( s"Data source $className does not support $outputMode output mode") } new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions) http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 7af978a..0551e4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -21,11 +21,11 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution -import org.apache.spark.sql.InternalOutputModes._ -import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.execution.SparkPlan @@ -108,6 +108,30 @@ case class StateStoreSaveExec( "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows")) + /** Generate a predicate that matches data older than the watermark */ + private lazy val watermarkPredicate: Option[Predicate] = { + val optionalWatermarkAttribute = + keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)) + + optionalWatermarkAttribute.map { watermarkAttribute => + // If we are evicting based on a window, use the end of the window. Otherwise just + // use the attribute itself. + val evictionExpression = + if (watermarkAttribute.dataType.isInstanceOf[StructType]) { + LessThanOrEqual( + GetStructField(watermarkAttribute, 1), + Literal(eventTimeWatermark.get * 1000)) + } else { + LessThanOrEqual( + watermarkAttribute, + Literal(eventTimeWatermark.get * 1000)) + } + + logInfo(s"Filtering state store on: $evictionExpression") + newPredicate(evictionExpression, keyExpressions) + } + } + override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver assert(outputMode.nonEmpty, @@ -151,25 +175,8 @@ case class StateStoreSaveExec( numUpdatedStateRows += 1 } - val watermarkAttribute = - keyExpressions.find(_.metadata.contains(EventTimeWatermark.delayKey)).get - // If we are evicting based on a window, use the end of the window. Otherwise just - // use the attribute itself. - val evictionExpression = - if (watermarkAttribute.dataType.isInstanceOf[StructType]) { - LessThanOrEqual( - GetStructField(watermarkAttribute, 1), - Literal(eventTimeWatermark.get * 1000)) - } else { - LessThanOrEqual( - watermarkAttribute, - Literal(eventTimeWatermark.get * 1000)) - } - - logInfo(s"Filtering state store on: $evictionExpression") - val predicate = newPredicate(evictionExpression, keyExpressions) - store.remove(predicate.eval) - + // Assumption: Append mode can be done only when watermark has been specified + store.remove(watermarkPredicate.get.eval) store.commit() numTotalStateRows += store.numKeys() @@ -180,11 +187,19 @@ case class StateStoreSaveExec( // Update and output modified rows from the StateStore. case Some(Update) => + new Iterator[InternalRow] { - private[this] val baseIterator = iter + + // Filter late date using watermark if specified + private[this] val baseIterator = watermarkPredicate match { + case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row)) + case None => iter + } override def hasNext: Boolean = { if (!baseIterator.hasNext) { + // Remove old aggregates if watermark specified + if (watermarkPredicate.nonEmpty) store.remove(watermarkPredicate.get.eval) store.commit() numTotalStateRows += store.numKeys() false http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index b699be2..91da6b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -193,11 +194,11 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi if (notCommitted) { logDebug(s"Committing batch $batchId to $this") outputMode match { - case InternalOutputModes.Append | InternalOutputModes.Update => + case Append | Update => val rows = AddedData(batchId, data.collect()) synchronized { batches += rows } - case InternalOutputModes.Complete => + case Complete => val rows = AddedData(batchId, data.collect()) synchronized { batches.clear() http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b7fc336..6c0c5e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} @@ -65,9 +66,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { OutputMode.Append case "complete" => OutputMode.Complete + case "update" => + OutputMode.Update case _ => throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + - "Accepted output modes are 'append' and 'complete'") + "Accepted output modes are 'append', 'complete', 'update'") } this } @@ -99,7 +102,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { this } - /** * Specifies the name of the [[StreamingQuery]] that can be started with `start()`. * This name must be unique among all the currently active queries in the associated SQLContext. @@ -219,7 +221,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - + val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'." + outputMode match { + case Append | Complete => // allowed + case Update => + throw new AnalysisException( + s"Update output mode is not supported for memory sink. $supportedModes") + case _ => + throw new AnalysisException( + s"$outputMode is not supported for memory sink. $supportedModes") + } val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala new file mode 100644 index 0000000..ca724fc --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.language.implicitConversions + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql._ +import org.apache.spark.sql.streaming.{OutputMode, StreamTest} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.Utils + +class MemorySinkSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("directly add data in Append output mode") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Append) + + // Before adding data, check output + assert(sink.latestBatchId === None) + checkAnswer(sink.latestBatchData, Seq.empty) + checkAnswer(sink.allData, Seq.empty) + + // Add batch 0 and check outputs + sink.addBatch(0, 1 to 3) + assert(sink.latestBatchId === Some(0)) + checkAnswer(sink.latestBatchData, 1 to 3) + checkAnswer(sink.allData, 1 to 3) + + // Add batch 1 and check outputs + sink.addBatch(1, 4 to 6) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data + + // Re-add batch 1 with different data, should not be added and outputs should not be changed + sink.addBatch(1, 7 to 9) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) + + // Add batch 2 and check outputs + sink.addBatch(2, 7 to 9) + assert(sink.latestBatchId === Some(2)) + checkAnswer(sink.latestBatchData, 7 to 9) + checkAnswer(sink.allData, 1 to 9) + } + + test("directly add data in Update output mode") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Update) + + // Before adding data, check output + assert(sink.latestBatchId === None) + checkAnswer(sink.latestBatchData, Seq.empty) + checkAnswer(sink.allData, Seq.empty) + + // Add batch 0 and check outputs + sink.addBatch(0, 1 to 3) + assert(sink.latestBatchId === Some(0)) + checkAnswer(sink.latestBatchData, 1 to 3) + checkAnswer(sink.allData, 1 to 3) + + // Add batch 1 and check outputs + sink.addBatch(1, 4 to 6) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data + + // Re-add batch 1 with different data, should not be added and outputs should not be changed + sink.addBatch(1, 7 to 9) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 1 to 6) + + // Add batch 2 and check outputs + sink.addBatch(2, 7 to 9) + assert(sink.latestBatchId === Some(2)) + checkAnswer(sink.latestBatchData, 7 to 9) + checkAnswer(sink.allData, 1 to 9) + } + + test("directly add data in Complete output mode") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Complete) + + // Before adding data, check output + assert(sink.latestBatchId === None) + checkAnswer(sink.latestBatchData, Seq.empty) + checkAnswer(sink.allData, Seq.empty) + + // Add batch 0 and check outputs + sink.addBatch(0, 1 to 3) + assert(sink.latestBatchId === Some(0)) + checkAnswer(sink.latestBatchData, 1 to 3) + checkAnswer(sink.allData, 1 to 3) + + // Add batch 1 and check outputs + sink.addBatch(1, 4 to 6) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 4 to 6) // new data should replace old data + + // Re-add batch 1 with different data, should not be added and outputs should not be changed + sink.addBatch(1, 7 to 9) + assert(sink.latestBatchId === Some(1)) + checkAnswer(sink.latestBatchData, 4 to 6) + checkAnswer(sink.allData, 4 to 6) + + // Add batch 2 and check outputs + sink.addBatch(2, 7 to 9) + assert(sink.latestBatchId === Some(2)) + checkAnswer(sink.latestBatchData, 7 to 9) + checkAnswer(sink.allData, 7 to 9) + } + + + test("registering as a table in Append output mode - supported") { + val input = MemoryStream[Int] + val query = input.toDF().writeStream + .format("memory") + .outputMode("append") + .queryName("memStream") + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3, 4, 5, 6) + + query.stop() + } + + test("registering as a table in Complete output mode - supported") { + val input = MemoryStream[Int] + val query = input.toDF() + .groupBy("value") + .count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName("memStream") + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDatasetUnorderly( + spark.table("memStream").as[(Int, Long)], + (1, 1L), (2, 1L), (3, 1L)) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDatasetUnorderly( + spark.table("memStream").as[(Int, Long)], + (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L)) + + query.stop() + } + + test("registering as a table in Update output mode - not supported") { + val input = MemoryStream[Int] + val df = input.toDF() + .groupBy("value") + .count() + intercept[AnalysisException] { + df.writeStream + .format("memory") + .outputMode("update") + .queryName("memStream") + .start() + } + } + + test("MemoryPlan statistics") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, OutputMode.Append) + val plan = new MemoryPlan(sink) + + // Before adding data, check output + checkAnswer(sink.allData, Seq.empty) + assert(plan.statistics.sizeInBytes === 0) + + sink.addBatch(0, 1 to 3) + assert(plan.statistics.sizeInBytes === 12) + + sink.addBatch(1, 4 to 6) + assert(plan.statistics.sizeInBytes === 24) + } + + ignore("stress test") { + // Ignore the stress test as it takes several minutes to run + (0 until 1000).foreach { _ => + val input = MemoryStream[Int] + val query = input.toDF().writeStream + .format("memory") + .queryName("memStream") + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3, 4, 5, 6) + + query.stop() + } + } + + test("error when no name is specified") { + val error = intercept[AnalysisException] { + val input = MemoryStream[Int] + val query = input.toDF().writeStream + .format("memory") + .start() + } + + assert(error.message contains "queryName must be specified") + } + + test("error if attempting to resume specific checkpoint") { + val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath + + val input = MemoryStream[Int] + val query = input.toDF().writeStream + .format("memory") + .queryName("memStream") + .option("checkpointLocation", location) + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + query.stop() + + intercept[AnalysisException] { + input.toDF().writeStream + .format("memory") + .queryName("memStream") + .option("checkpointLocation", location) + .start() + } + } + + private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = { + checkAnswer( + sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema), + intsToDF(expected)(schema)) + } + + private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = { + require(schema.fields.size === 1) + sqlContext.createDataset(seq).toDF(schema.fieldNames.head) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index bdfba95..23f51ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.streaming import java.{util => ju} import java.text.SimpleDateFormat -import java.util.{Calendar, Date} +import java.util.Date import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} -import org.apache.spark.sql.InternalOutputModes.Complete +import org.apache.spark.sql.streaming.OutputMode._ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging { @@ -117,7 +117,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } - test("append-mode watermark aggregation") { + test("append mode") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() @@ -129,11 +129,42 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin testStream(windowedAggregation)( AddData(inputData, 10, 11, 12, 13, 14, 15), - CheckAnswer(), - AddData(inputData, 25), // Advance watermark to 15 seconds - CheckAnswer(), - AddData(inputData, 25), // Evict items less than previous watermark. - CheckAnswer((10, 5)) + CheckLastBatch(), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckLastBatch(), + assertNumStateRows(3), + AddData(inputData, 25), // Emit items less than watermark and drop their state + CheckLastBatch((10, 5)), + assertNumStateRows(2), + AddData(inputData, 10), // Should not emit anything as data less than watermark + CheckLastBatch(), + assertNumStateRows(2) + ) + } + + test("update mode") { + val inputData = MemoryStream[Int] + spark.conf.set("spark.sql.shuffle.partitions", "10") + + val windowedAggregation = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(windowedAggregation, OutputMode.Update)( + AddData(inputData, 10, 11, 12, 13, 14, 15), + CheckLastBatch((10, 5), (15, 1)), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckLastBatch((25, 1)), + assertNumStateRows(3), + AddData(inputData, 10, 25), // Ignore 10 as its less than watermark + CheckLastBatch((25, 2)), + assertNumStateRows(2), + AddData(inputData, 10), // Should not emit anything as data less than watermark + CheckLastBatch(), + assertNumStateRows(2) ) } @@ -271,6 +302,12 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => + val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get + assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) + true + } + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { AssertOnQuery { q => body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 54efae3..22f59f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex} @@ -210,6 +210,26 @@ class FileStreamSinkSuite extends StreamTest { } } + test("Update and Complete output mode not supported") { + val df = MemoryStream[Int].toDF().groupBy().count() + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + + withTempDir { dir => + + def testOutputMode(mode: String): Unit = { + val e = intercept[AnalysisException] { + df.writeStream.format("parquet").outputMode(mode).start(dir.getCanonicalPath) + } + Seq(mode, "not support").foreach { w => + assert(e.getMessage.toLowerCase.contains(w)) + } + } + + testOutputMode("update") + testOutputMode("complete") + } + } + test("parquet") { testFormat(None) // should not throw error as default format parquet when not specified testFormat(Some("parquet")) http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 2d218f4..55d927a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -899,7 +899,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // This is to avoid actually running a Spark job with 10000 tasks val df = files.filter("1 == 0").groupBy().count() - testStream(df, InternalOutputModes.Complete)( + testStream(df, OutputMode.Complete)( AddTextFileData("0", src, tmp), CheckAnswer(0) ) http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala deleted file mode 100644 index 4e9fba9..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import scala.language.implicitConversions - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} -import org.apache.spark.util.Utils - -class MemorySinkSuite extends StreamTest with BeforeAndAfter { - - import testImplicits._ - - after { - sqlContext.streams.active.foreach(_.stop()) - } - - test("directly add data in Append output mode") { - implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, InternalOutputModes.Append) - - // Before adding data, check output - assert(sink.latestBatchId === None) - checkAnswer(sink.latestBatchData, Seq.empty) - checkAnswer(sink.allData, Seq.empty) - - // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) - assert(sink.latestBatchId === Some(0)) - checkAnswer(sink.latestBatchData, 1 to 3) - checkAnswer(sink.allData, 1 to 3) - - // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data - - // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 1 to 6) - - // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) - assert(sink.latestBatchId === Some(2)) - checkAnswer(sink.latestBatchData, 7 to 9) - checkAnswer(sink.allData, 1 to 9) - } - - test("directly add data in Update output mode") { - implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, InternalOutputModes.Update) - - // Before adding data, check output - assert(sink.latestBatchId === None) - checkAnswer(sink.latestBatchData, Seq.empty) - checkAnswer(sink.allData, Seq.empty) - - // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) - assert(sink.latestBatchId === Some(0)) - checkAnswer(sink.latestBatchData, 1 to 3) - checkAnswer(sink.allData, 1 to 3) - - // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data - - // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 1 to 6) - - // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) - assert(sink.latestBatchId === Some(2)) - checkAnswer(sink.latestBatchData, 7 to 9) - checkAnswer(sink.allData, 1 to 9) - } - - test("directly add data in Complete output mode") { - implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, InternalOutputModes.Complete) - - // Before adding data, check output - assert(sink.latestBatchId === None) - checkAnswer(sink.latestBatchData, Seq.empty) - checkAnswer(sink.allData, Seq.empty) - - // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) - assert(sink.latestBatchId === Some(0)) - checkAnswer(sink.latestBatchData, 1 to 3) - checkAnswer(sink.allData, 1 to 3) - - // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 4 to 6) // new data should replace old data - - // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) - assert(sink.latestBatchId === Some(1)) - checkAnswer(sink.latestBatchData, 4 to 6) - checkAnswer(sink.allData, 4 to 6) - - // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) - assert(sink.latestBatchId === Some(2)) - checkAnswer(sink.latestBatchData, 7 to 9) - checkAnswer(sink.allData, 7 to 9) - } - - - test("registering as a table in Append output mode") { - val input = MemoryStream[Int] - val query = input.toDF().writeStream - .format("memory") - .outputMode("append") - .queryName("memStream") - .start() - input.addData(1, 2, 3) - query.processAllAvailable() - - checkDataset( - spark.table("memStream").as[Int], - 1, 2, 3) - - input.addData(4, 5, 6) - query.processAllAvailable() - checkDataset( - spark.table("memStream").as[Int], - 1, 2, 3, 4, 5, 6) - - query.stop() - } - - test("registering as a table in Complete output mode") { - val input = MemoryStream[Int] - val query = input.toDF() - .groupBy("value") - .count() - .writeStream - .format("memory") - .outputMode("complete") - .queryName("memStream") - .start() - input.addData(1, 2, 3) - query.processAllAvailable() - - checkDatasetUnorderly( - spark.table("memStream").as[(Int, Long)], - (1, 1L), (2, 1L), (3, 1L)) - - input.addData(4, 5, 6) - query.processAllAvailable() - checkDatasetUnorderly( - spark.table("memStream").as[(Int, Long)], - (1, 1L), (2, 1L), (3, 1L), (4, 1L), (5, 1L), (6, 1L)) - - query.stop() - } - - test("MemoryPlan statistics") { - implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, InternalOutputModes.Append) - val plan = new MemoryPlan(sink) - - // Before adding data, check output - checkAnswer(sink.allData, Seq.empty) - assert(plan.statistics.sizeInBytes === 0) - - sink.addBatch(0, 1 to 3) - assert(plan.statistics.sizeInBytes === 12) - - sink.addBatch(1, 4 to 6) - assert(plan.statistics.sizeInBytes === 24) - } - - ignore("stress test") { - // Ignore the stress test as it takes several minutes to run - (0 until 1000).foreach { _ => - val input = MemoryStream[Int] - val query = input.toDF().writeStream - .format("memory") - .queryName("memStream") - .start() - input.addData(1, 2, 3) - query.processAllAvailable() - - checkDataset( - spark.table("memStream").as[Int], - 1, 2, 3) - - input.addData(4, 5, 6) - query.processAllAvailable() - checkDataset( - spark.table("memStream").as[Int], - 1, 2, 3, 4, 5, 6) - - query.stop() - } - } - - test("error when no name is specified") { - val error = intercept[AnalysisException] { - val input = MemoryStream[Int] - val query = input.toDF().writeStream - .format("memory") - .start() - } - - assert(error.message contains "queryName must be specified") - } - - test("error if attempting to resume specific checkpoint") { - val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath - - val input = MemoryStream[Int] - val query = input.toDF().writeStream - .format("memory") - .queryName("memStream") - .option("checkpointLocation", location) - .start() - input.addData(1, 2, 3) - query.processAllAvailable() - query.stop() - - intercept[AnalysisException] { - input.toDF().writeStream - .format("memory") - .queryName("memStream") - .option("checkpointLocation", location) - .start() - } - } - - private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = { - checkAnswer( - sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema), - intsToDF(expected)(schema)) - } - - private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): DataFrame = { - require(schema.fields.size === 1) - sqlContext.createDataset(seq).toDF(schema.fieldNames.head) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 4a64054..b8fa82d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import scala.util.control.ControlThrowable import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -265,10 +266,9 @@ class StreamSuite extends StreamTest { } test("output mode API in Scala") { - val o1 = OutputMode.Append - assert(o1 === InternalOutputModes.Append) - val o2 = OutputMode.Complete - assert(o2 === InternalOutputModes.Complete) + assert(OutputMode.Append === InternalOutputModes.Append) + assert(OutputMode.Complete === InternalOutputModes.Complete) + assert(OutputMode.Update === InternalOutputModes.Update) } test("explain") { http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index fbe560e..eca2647 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -23,13 +23,13 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.OutputMode._ object FailureSinglton { var firstTime = true http://git-wip-us.apache.org/repos/asf/spark/blob/83a6ace0/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 9de3da3..097dd6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.mockito.Mockito._ -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.scalatest.PrivateMethodTester.PrivateMethod import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest} +import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -105,7 +106,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } } -class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { +class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with PrivateMethodTester { private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath @@ -388,19 +389,40 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath - test("check outputMode(string) throws exception on unsupported modes") { - def testError(outputMode: String): Unit = { + test("supported strings in outputMode(string)") { + val outputModeMethod = PrivateMethod[OutputMode]('outputMode) + + def testMode(outputMode: String, expected: OutputMode): Unit = { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + val w = df.writeStream + w.outputMode(outputMode) + val setOutputMode = w invokePrivate outputModeMethod() + assert(setOutputMode === expected) + } + + testMode("append", OutputMode.Append) + testMode("Append", OutputMode.Append) + testMode("complete", OutputMode.Complete) + testMode("Complete", OutputMode.Complete) + testMode("update", OutputMode.Update) + testMode("Update", OutputMode.Update) + } + + test("unsupported strings in outputMode(string)") { + def testMode(outputMode: String): Unit = { + val acceptedModes = Seq("append", "update", "complete") val df = spark.readStream .format("org.apache.spark.sql.streaming.test") .load() val w = df.writeStream val e = intercept[IllegalArgumentException](w.outputMode(outputMode)) - Seq("output mode", "unknown", outputMode).foreach { s => + (Seq("output mode", "unknown", outputMode) ++ acceptedModes).foreach { s => assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) } } - testError("Update") - testError("Xyz") + testMode("Xyz") } test("check foreach() catches null writers") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org