[ https://issues.apache.org/jira/browse/SPARK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720719#comment-16720719 ]
ASF GitHub Bot commented on SPARK-23886: ---------------------------------------- asfgit closed pull request #23095: [SPARK-23886][SS] Update query status for ContinuousExecution URL: https://github.com/apache/spark/pull/23095 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 2cac86599ef19..f2dda0373c7ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -146,6 +146,12 @@ class MicroBatchExecution( logInfo(s"Query $prettyIdString was stopped") } + /** Begins recording statistics about query progress for a given trigger. */ + override protected def startTrigger(): Unit = { + super.startTrigger() + currentStatus = currentStatus.copy(isTriggerActive = true) + } + /** * Repeatedly attempts to run batches as data arrives. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 392229bcb5f55..a5fbb56e27099 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -114,7 +114,6 @@ trait ProgressReporter extends Logging { logDebug("Starting Trigger Calculation") lastTriggerStartTimestamp = currentTriggerStartTimestamp currentTriggerStartTimestamp = triggerClock.getTimeMillis() - currentStatus = currentStatus.copy(isTriggerActive = true) currentTriggerStartOffsets = null currentTriggerEndOffsets = null currentDurationsMs.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 4a7df731da67d..adbec0b00f368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -117,6 +117,8 @@ class ContinuousExecution( // For at least once, we can just ignore those reports and risk duplicates. commitLog.getLatest() match { case Some((latestEpochId, _)) => + updateStatusMessage("Starting new streaming query " + + s"and getting offsets from latest epoch $latestEpochId") val nextOffsets = offsetLog.get(latestEpochId).getOrElse { throw new IllegalStateException( s"Batch $latestEpochId was committed without end epoch offsets!") @@ -128,6 +130,7 @@ class ContinuousExecution( nextOffsets case None => // We are starting this stream for the first time. Offsets are all None. + updateStatusMessage("Starting new streaming query") logInfo(s"Starting new streaming query.") currentBatchId = 0 OffsetSeq.fill(continuousSources.map(_ => null): _*) @@ -260,6 +263,7 @@ class ContinuousExecution( epochUpdateThread.setDaemon(true) epochUpdateThread.start() + updateStatusMessage("Running") reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution) { @@ -319,6 +323,8 @@ class ContinuousExecution( * before this is called. */ def commit(epoch: Long): Unit = { + updateStatusMessage(s"Committing epoch $epoch") + assert(continuousSources.length == 1, "only one continuous source supported currently") assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index a0c9bcc8929eb..ca79e0248c06b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -28,9 +28,11 @@ import org.apache.spark.annotation.InterfaceStability * Reports information about the instantaneous status of a streaming query. * * @param message A human readable description of what the stream is currently doing. - * @param isDataAvailable True when there is new data to be processed. + * @param isDataAvailable True when there is new data to be processed. Doesn't apply + * to ContinuousExecution where it is always false. * @param isTriggerActive True when the trigger is actively firing, false when waiting for the - * next trigger time. + * next trigger time. Doesn't apply to ContinuousExecution where it is + * always false. * * @since 2.1.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala new file mode 100644 index 0000000000000..10bea7f090571 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueryStatusAndProgressSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream +import org.apache.spark.sql.streaming.Trigger + +class ContinuousQueryStatusAndProgressSuite extends ContinuousSuiteBase { + test("StreamingQueryStatus - ContinuousExecution isDataAvailable and isTriggerActive " + + "should be false") { + import testImplicits._ + + val input = ContinuousMemoryStream[Int] + + def assertStatus(stream: StreamExecution): Unit = { + assert(stream.status.isDataAvailable === false) + assert(stream.status.isTriggerActive === false) + } + + val trigger = Trigger.Continuous(100) + testStream(input.toDF(), useV2Sink = true)( + StartStream(trigger), + Execute(assertStatus), + AddData(input, 0, 1, 2), + Execute(assertStatus), + CheckAnswer(0, 1, 2), + Execute(assertStatus), + StopStream, + Execute(assertStatus), + AddData(input, 3, 4, 5), + Execute(assertStatus), + StartStream(trigger), + Execute(assertStatus), + CheckAnswer(0, 1, 2, 3, 4, 5), + Execute(assertStatus), + StopStream, + Execute(assertStatus)) + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > update query.status > ------------------- > > Key: SPARK-23886 > URL: https://issues.apache.org/jira/browse/SPARK-23886 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Jose Torres > Assignee: Gabor Somogyi > Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org