[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20650 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20650#discussion_r169605919 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -217,6 +225,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be s"ExpectFailure[${causeClass.getName}, isFatalError: $isFatalError]" } + case class StreamProgressLockedActions(actions: Seq[StreamAction], desc: String = null) --- End diff -- TODO: add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20650#discussion_r169605952 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -425,243 +444,248 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } } -var manualClockExpectedTime = -1L -val defaultCheckpointLocation = - Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath -try { - startedTest.foreach { action => -logInfo(s"Processing test stream action: $action") -action match { - case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) => -verify(currentStream == null, "stream already running") -verify(triggerClock.isInstanceOf[SystemClock] - || triggerClock.isInstanceOf[StreamManualClock], - "Use either SystemClock or StreamManualClock to start the stream") -if (triggerClock.isInstanceOf[StreamManualClock]) { - manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() +def performAction(action: StreamAction): Unit = { --- End diff -- TODO: Add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20650#discussion_r169605887 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -102,6 +102,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be AddDataMemory(source, data) } + object MultiAddData { --- End diff -- TODO: add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20650#discussion_r169604484 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -543,6 +543,15 @@ abstract class StreamExecution( Option(name).map(_ + "").getOrElse("") + s"id = $idrunId = $runIdbatch = $batchDescription" } + + private[sql] def withProgressLocked(f: => Unit): Unit = { --- End diff -- TODO: Add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20650: [SPARK-23408][SS] Synchronize successive AddData ...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20650 [SPARK-23408][SS] Synchronize successive AddData actions in Streaming*JoinSuite ## What changes were proposed in this pull request? The stream-stream join tests add data to multiple sources and expect it all to show up in the next batch. But there's a race condition; the new batch might trigger when only one of the AddData actions has been reached. Prior attempt to solve this issue by @jose-torres in #20646 attempted to simultaneously synchronize on all memory sources together when consecutive AddData was found in the actions. However, this carries the risk of deadlock as well as unintended modification of stress tests (see the above PR for a detailed explanation). Instead, this PR attempts the following. - A new action called `StreamProgressBlockedActions` that allows multiple actions to be executed while the streaming query is blocked from making progress. This allows data to be added to multiple sources that are made visible simultaneously in the next batch. - An alias of `StreamProgressBlockedActions` called `MultiAddData` is explicitly used in the `Streaming*JoinSuites` to add data to two memory sources simultaneously. ## How was this patch tested? Modified test cases in `Streaming*JoinSuites` where there are consecutive `AddData` actions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20650 commit b4c3c55db394178f083d3eeaf537e407c026f0cd Author: Tathagata DasDate: 2018-02-21T10:48:15Z Fixed bug --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org