Repository: spark Updated Branches: refs/heads/branch-2.1 e8866f9fc -> c4de90fc7
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty ## What changes were proposed in this pull request? Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError. This PR just makes it return null instead. ## How was this patch tested? `test("lastProgress should be null when recentProgress is empty")` Author: Shixiong Zhu <shixi...@databricks.com> Closes #16273 from zsxwing/SPARK-18852. (cherry picked from commit 1ac6567bdb03d7cc5c5f3473827a102280cb1030) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4de90fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4de90fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4de90fc Branch: refs/heads/branch-2.1 Commit: c4de90fc76d5aa5d2c8fee4ed692d4ab922cbab0 Parents: e8866f9 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Dec 14 13:36:41 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Dec 14 13:36:55 2016 -0800 ---------------------------------------------------------------------- python/pyspark/sql/streaming.py | 9 ++- python/pyspark/sql/tests.py | 18 ++++- .../execution/streaming/ProgressReporter.scala | 4 +- .../streaming/StreamingQueryManagerSuite.scala | 9 +-- .../sql/streaming/StreamingQuerySuite.scala | 21 +++++- .../sql/streaming/util/BlockingSource.scala | 72 ++++++++++++++++++++ .../sql/streaming/util/DefaultSource.scala | 66 ------------------ 7 files changed, 121 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 9cfb3fe..eabd5ef 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -125,10 +125,15 @@ class StreamingQuery(object): @since(2.1) def lastProgress(self): """ - Returns the most recent :class:`StreamingQueryProgress` update of this streaming query. + Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or + None if there were no progress updates :return: a map """ - return json.loads(self._jsq.lastProgress().json()) + lastProgress = self._jsq.lastProgress() + if lastProgress: + return json.loads(lastProgress.json()) + else: + return None @since(2.0) def processAllAvailable(self): http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 66320bd..115b4a9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1119,9 +1119,25 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - q = df.writeStream \ + + def func(x): + time.sleep(1) + return x + + from pyspark.sql.functions import col, udf + sleep_udf = udf(func) + + # Use "sleep_udf" to delay the progress update so that we can test `lastProgress` when there + # were no updates. + q = df.select(sleep_udf(col("value")).alias('value')).writeStream \ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: + # "lastProgress" will return None in most cases. However, as it may be flaky when + # Jenkins is very slow, we don't assert it. If there is something wrong, "lastProgress" + # may throw error with a high chance and make this test flaky, so we should still be + # able to detect broken codes. + q.lastProgress + q.processAllAvailable() lastProgress = q.lastProgress recentProgress = q.recentProgress http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 549b936..e40135f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -100,9 +100,9 @@ trait ProgressReporter extends Logging { progressBuffer.toArray } - /** Returns the most recent query progress update. */ + /** Returns the most recent query progress update or null if there were no progress updates. */ def lastProgress: StreamingQueryProgress = progressBuffer.synchronized { - progressBuffer.last + progressBuffer.lastOption.orNull } /** Begins recording statistics about query progress for a given trigger. */ http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index d188319..1742a54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { @@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { test("SPARK-18811: Source resolution should not block main thread") { failAfter(streamingTimeout) { - StreamingQueryManagerSuite.latch = new CountDownLatch(1) + BlockingSource.latch = new CountDownLatch(1) withTempDir { tempDir => // if source resolution was happening on the main thread, it would block the start call, // now it should only be blocking the stream execution thread @@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { eventually(Timeout(streamingTimeout)) { assert(sq.status.message.contains("Initializing sources")) } - StreamingQueryManagerSuite.latch.countDown() + BlockingSource.latch.countDown() sq.stop() } } @@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { (inputData, mapped) } } - -object StreamingQueryManagerSuite { - var latch: CountDownLatch = null -} http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index afd788c..b052bd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.streaming -import scala.collection.JavaConverters._ +import java.util.concurrent.CountDownLatch import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.ManualClock @@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } + test("lastProgress should be null when recentProgress is empty") { + BlockingSource.latch = new CountDownLatch(1) + withTempDir { tempDir => + val sq = spark.readStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .option("checkpointLocation", tempDir.toString) + .start() + // Creating source is blocked so recentProgress is empty and lastProgress should be null + assert(sq.lastProgress === null) + // Release the latch and stop the query + BlockingSource.latch.countDown() + sq.stop() + } + } + test("codahale metrics") { val inputData = MemoryStream[Int] http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala new file mode 100644 index 0000000..19ab2ff --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import java.util.concurrent.CountDownLatch + +import org.apache.spark.sql.{SQLContext, _} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ +class BlockingSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + spark: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + ("dummySource", fakeSchema) + } + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + BlockingSource.latch.await() + new Source { + override def schema: StructType = fakeSchema + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import spark.implicits._ + Seq[Int]().toDS().toDF() + } + override def stop() {} + } + } + + override def createSink( + spark: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + new Sink { + override def addBatch(batchId: Long, data: DataFrame): Unit = {} + } + } +} + +object BlockingSource { + var latch: CountDownLatch = null +} http://git-wip-us.apache.org/repos/asf/spark/blob/c4de90fc/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala deleted file mode 100644 index b0adf76..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming.util - -import org.apache.spark.sql.{SQLContext, _} -import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} -import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} - -/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ -class BlockingSource extends StreamSourceProvider with StreamSinkProvider { - - private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) - - override def sourceSchema( - spark: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): (String, StructType) = { - ("dummySource", fakeSchema) - } - - override def createSource( - spark: SQLContext, - metadataPath: String, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): Source = { - StreamingQueryManagerSuite.latch.await() - new Source { - override def schema: StructType = fakeSchema - override def getOffset: Option[Offset] = Some(new LongOffset(0)) - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - import spark.implicits._ - Seq[Int]().toDS().toDF() - } - override def stop() {} - } - } - - override def createSink( - spark: SQLContext, - parameters: Map[String, String], - partitionColumns: Seq[String], - outputMode: OutputMode): Sink = { - new Sink { - override def addBatch(batchId: Long, data: DataFrame): Unit = {} - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org