Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19926#discussion_r157047178
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -0,0 +1,407 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.sql.{Dataset, SparkSession}
    +import org.apache.spark.sql.catalyst.encoders.RowEncoder
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.sources.v2.reader.Offset
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class MicroBatchExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: Sink,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  private val triggerExecutor = trigger match {
    +    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
    +    case OneTimeTrigger => OneTimeExecutor()
    +    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
    +  }
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in QueryExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
    +    val _logicalPlan = analyzedPlan.transform {
    +      case streamingRelation@StreamingRelation(dataSource, _, output) =>
    +        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
    +          // Materialize source to avoid creating it in every batch
    +          val metadataPath = 
s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +          val source = dataSource.createSource(metadataPath)
    +          nextSourceId += 1
    +          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
    +          // "df.logicalPlan" has already used attributes of the previous 
`output`.
    +          StreamingExecutionRelation(source, output)(sparkSession)
    +        })
    +    }
    +    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
    +    uniqueSources = sources.distinct
    +    _logicalPlan
    +  }
    +
    +  /**
    +   * Repeatedly attempts to run batches as data arrives.
    +   */
    +  protected def runActivatedStream(sparkSessionForStream: SparkSession): 
Unit = {
    +    triggerExecutor.execute(() => {
    +      startTrigger()
    +
    +      if (isActive) {
    +        reportTimeTaken("triggerExecution") {
    +          if (currentBatchId < 0) {
    +            // We'll do this initialization only once
    +            populateStartOffsets(sparkSessionForStream)
    +            
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +            logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
    +          } else {
    +            constructNextBatch()
    +          }
    +          if (dataAvailable) {
    +            currentStatus = currentStatus.copy(isDataAvailable = true)
    +            updateStatusMessage("Processing new data")
    +            runBatch(sparkSessionForStream)
    +          }
    +        }
    +        // Report trigger as finished and construct progress object.
    +        finishTrigger(dataAvailable)
    +        if (dataAvailable) {
    +          // Update committed offsets.
    +          commitLog.add(currentBatchId)
    +          committedOffsets ++= availableOffsets
    +          logDebug(s"batch ${currentBatchId} committed")
    +          // We'll increase currentBatchId after we complete processing 
current batch's data
    +          currentBatchId += 1
    +          
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +        } else {
    +          currentStatus = currentStatus.copy(isDataAvailable = false)
    +          updateStatusMessage("Waiting for data to arrive")
    +          Thread.sleep(pollingDelayMs)
    +        }
    +      }
    +      updateStatusMessage("Waiting for next trigger")
    +      isActive
    +    })
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current 
offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the offset log) the offsets used to run the last batch
    +   *  IF last batch exists THEN
    +   *    Set the next batch to be executed as the last recovered batch
    +   *    Check the commit log to see which batch was committed last
    +   *    IF the last batch was committed THEN
    +   *      Call getBatch using the last batch start and end offsets
    +   *      // ^^^^ above line is needed since some sources assume last 
batch always re-executes
    +   *      Setup for a new batch i.e., start = last batch end, and identify 
new end
    +   *    DONE
    +   *  ELSE
    +   *    Identify a brand new batch
    +   *  DONE
    +   */
    +  private def populateStartOffsets(sparkSessionToRunBatches: 
SparkSession): Unit = {
    +    offsetLog.getLatest() match {
    +      case Some((latestBatchId, nextOffsets)) =>
    +        /* First assume that we are re-executing the latest known batch
    +         * in the offset log */
    +        currentBatchId = latestBatchId
    +        availableOffsets = nextOffsets.toStreamProgress(sources)
    +        /* Initialize committed offsets to a committed batch, which at this
    +         * is the second latest batch id in the offset log. */
    +        if (latestBatchId != 0) {
    +          val secondLatestBatchId = offsetLog.get(latestBatchId - 
1).getOrElse {
    +            throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
    +          }
    +          committedOffsets = secondLatestBatchId.toStreamProgress(sources)
    +        }
    +
    +        // update offset metadata
    +        nextOffsets.metadata.foreach { metadata =>
    +          OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
    +          offsetSeqMetadata = OffsetSeqMetadata(
    +            metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
    +        }
    +
    +        /* identify the current batch id: if commit log indicates we 
successfully processed the
    +         * latest batch id in the offset log, then we can safely move to 
the next batch
    +         * i.e., committedBatchId + 1 */
    +        commitLog.getLatest() match {
    +          case Some((latestCommittedBatchId, _)) =>
    +            if (latestBatchId == latestCommittedBatchId) {
    +              /* The last batch was successfully committed, so we can 
safely process a
    +               * new next batch but first:
    +               * Make a call to getBatch using the offsets from previous 
batch.
    +               * because certain sources (e.g., KafkaSource) assume on 
restart the last
    +               * batch will be executed before getOffset is called again. 
*/
    +              availableOffsets.foreach { ao: (Source, Offset) =>
    +                val (source, end) = ao
    +                if (committedOffsets.get(source).map(_ != 
end).getOrElse(true)) {
    +                  val start = committedOffsets.get(source)
    +                  source.getBatch(start, end)
    +                }
    +              }
    +              currentBatchId = latestCommittedBatchId + 1
    +              committedOffsets ++= availableOffsets
    +              // Construct a new batch be recomputing availableOffsets
    +              constructNextBatch()
    +            } else if (latestCommittedBatchId < latestBatchId - 1) {
    +              logWarning(s"Batch completion log latest batch id is " +
    +                s"${latestCommittedBatchId}, which is not trailing " +
    +                s"batchid $latestBatchId by one")
    +            }
    +          case None => logInfo("no commit log present")
    +        }
    +        logDebug(s"Resuming at batch $currentBatchId with committed 
offsets " +
    +          s"$committedOffsets and available offsets $availableOffsets")
    +      case None => // We are starting this stream for the first time.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        constructNextBatch()
    +    }
    +  }
    +
    +  /**
    +   * Returns true if there is any new data available to be processed.
    +   */
    +  private def dataAvailable: Boolean = {
    +    availableOffsets.exists {
    +      case (source, available) =>
    +        committedOffsets
    +          .get(source)
    +          .map(committed => committed != available)
    +          .getOrElse(true)
    +    }
    +  }
    +
    +  /**
    +   * Queries all of the sources to see if any new data is available. When 
there is new data the
    +   * batchId counter is incremented and a new log entry is written with 
the newest offsets.
    +   */
    +  private def constructNextBatch(): Unit = {
    +    // Check to see what new data is available.
    +    val hasNewData = {
    +      awaitProgressLock.lock()
    +      try {
    +        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map 
{ s =>
    +          updateStatusMessage(s"Getting offsets from $s")
    +          reportTimeTaken("getOffset") {
    +            (s, s.getOffset)
    +          }
    +        }.toMap
    +        availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
    +
    +        if (dataAvailable) {
    +          true
    +        } else {
    +          noNewData = true
    +          false
    +        }
    +      } finally {
    +        awaitProgressLock.unlock()
    +      }
    +    }
    +    if (hasNewData) {
    +      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
    +      // Update the eventTime watermarks if we find any in the plan.
    +      if (lastExecution != null) {
    +        lastExecution.executedPlan.collect {
    +          case e: EventTimeWatermarkExec => e
    +        }.zipWithIndex.foreach {
    +          case (e, index) if e.eventTimeStats.value.count > 0 =>
    +            logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
    +            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    +            val prevWatermarkMs = watermarkMsMap.get(index)
    +            if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
    +              watermarkMsMap.put(index, newWatermarkMs)
    +            }
    +
    +          // Populate 0 if we haven't seen any data yet for this watermark 
node.
    +          case (_, index) =>
    +            if (!watermarkMsMap.isDefinedAt(index)) {
    +              watermarkMsMap.put(index, 0)
    +            }
    +        }
    +
    +        // Update the global watermark to the minimum of all watermark 
nodes.
    +        // This is the safest option, because only the global watermark is 
fault-tolerant. Making
    +        // it the minimum of all individual watermarks guarantees it will 
never advance past where
    +        // any individual watermark operator would be if it were in a plan 
by itself.
    +        if(!watermarkMsMap.isEmpty) {
    +          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
    +          if (newWatermarkMs > batchWatermarkMs) {
    +            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    +            batchWatermarkMs = newWatermarkMs
    +          } else {
    +            logDebug(
    +              s"Event time didn't move: $newWatermarkMs < " +
    +                s"$batchWatermarkMs")
    +          }
    +        }
    +      }
    +      offsetSeqMetadata = offsetSeqMetadata.copy(
    +        batchWatermarkMs = batchWatermarkMs,
    +        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch 
timestamp in milliseconds
    +
    +      updateStatusMessage("Writing offsets to log")
    +      reportTimeTaken("walCommit") {
    +        assert(offsetLog.add(
    +          currentBatchId,
    +          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
    +          s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId")
    +        logInfo(s"Committed offsets for batch $currentBatchId. " +
    +          s"Metadata ${offsetSeqMetadata.toString}")
    +
    +        // NOTE: The following code is correct because runStream() 
processes exactly one
    +        // batch at a time. If we add pipeline parallelism (multiple 
batches in flight at
    +        // the same time), this cleanup logic will need to change.
    +
    +        // Now that we've updated the scheduler's persistent checkpoint, 
it is safe for the
    +        // sources to discard data from the previous batch.
    +        if (currentBatchId != 0) {
    +          val prevBatchOff = offsetLog.get(currentBatchId - 1)
    +          if (prevBatchOff.isDefined) {
    +            prevBatchOff.get.toStreamProgress(sources).foreach {
    +              case (src, off) => src.commit(off)
    +            }
    +          } else {
    +            throw new IllegalStateException(s"batch $currentBatchId 
doesn't exist")
    +          }
    +        }
    +
    +        // It is now safe to discard the metadata beyond the minimum 
number to retain.
    +        // Note that purge is exclusive, i.e. it purges everything before 
the target ID.
    +        if (minLogEntriesToMaintain < currentBatchId) {
    +          offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
    +          commitLog.purge(currentBatchId - minLogEntriesToMaintain)
    +        }
    +      }
    +    } else {
    +      awaitProgressLock.lock()
    +      try {
    +        // Wake up any threads that are waiting for the stream to progress.
    +        awaitProgressLockCondition.signalAll()
    +      } finally {
    +        awaitProgressLock.unlock()
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Processes any data available between `availableOffsets` and 
`committedOffsets`.
    +   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this 
batch with.
    +   */
    +  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    +    // Request unprocessed data from all sources.
    +    newData = reportTimeTaken("getBatch") {
    +      availableOffsets.flatMap {
    +        case (source, available)
    +          if committedOffsets.get(source).map(_ != 
available).getOrElse(true) =>
    +          val current = committedOffsets.get(source)
    +          val batch = source.getBatch(current, available)
    +          assert(batch.isStreaming,
    +            s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
    +              s"${batch.queryExecution.logical}")
    +          logDebug(s"Retrieving data from $source: $current -> $available")
    +          Some(source -> batch)
    +        case _ => None
    +      }
    +    }
    +
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Replace sources in the logical plan with data that has arrived 
since the last batch.
    +    val withNewSources = logicalPlan transform {
    +      case StreamingExecutionRelation(source, output) =>
    +        newData.get(source).map { data =>
    +          val newPlan = data.logicalPlan
    +          assert(output.size == newPlan.output.size,
    +            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
    +              s"${Utils.truncatedString(newPlan.output, ",")}")
    +          replacements ++= output.zip(newPlan.output)
    +          newPlan
    +        }.getOrElse {
    +          LocalRelation(output, isStreaming = true)
    +        }
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the 
source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case ct: CurrentTimestamp =>
    --- End diff --
    
    That's a major candidate solution, but we're planning to just not support 
CurrentTimestamp for the initial implementation. It would require significant 
changes, since control flow won't return here between epochs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to