Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20936#discussion_r183492587
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
    @@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator(
       private val partitionOffsets =
         mutable.Map[(Long, Int), PartitionOffset]()
     
    +  private var lastCommittedEpoch = startEpoch - 1
    +  // Remembers epochs that have to wait for previous epochs to be 
committed first.
    +  private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
    +
       private def resolveCommitsAtEpoch(epoch: Long) = {
    -    val thisEpochCommits =
    -      partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
    +    val thisEpochCommits = findCommitsForEpoch(epoch)
         val nextEpochOffsets =
           partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
     
         if (thisEpochCommits.size == numWriterPartitions &&
           nextEpochOffsets.size == numReaderPartitions) {
    -      logDebug(s"Epoch $epoch has received commits from all partitions. 
Committing globally.")
    -      // Sequencing is important here. We must commit to the writer before 
recording the commit
    -      // in the query, or we will end up dropping the commit if we restart 
in the middle.
    -      writer.commit(epoch, thisEpochCommits.toArray)
    -      query.commit(epoch)
    -
    -      // Cleanup state from before this epoch, now that we know all 
partitions are forever past it.
    -      for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) 
{
    -        partitionCommits.remove(k)
    -      }
    -      for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) 
{
    -        partitionOffsets.remove(k)
    +
    +      // Check that last committed epoch is the previous one for 
sequencing of committed epochs.
    +      // If not, add the epoch being currently processed to epochs waiting 
to be committed,
    +      // otherwise commit it.
    +      if (lastCommittedEpoch != epoch - 1) {
    +        logDebug(s"Epoch $epoch has received commits from all partitions " 
+
    +          s"and is waiting for epoch ${epoch - 1} to be committed first.")
    +        epochsWaitingToBeCommitted.add(epoch)
    +      } else {
    +        commitEpoch(epoch, thisEpochCommits)
    +        lastCommittedEpoch = epoch
    +
    +        // Commit subsequent epochs that are waiting to be committed.
    +        var nextEpoch = lastCommittedEpoch + 1
    +        while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
    +          val nextEpochCommits = findCommitsForEpoch(nextEpoch)
    +          commitEpoch(nextEpoch, nextEpochCommits)
    +
    +          epochsWaitingToBeCommitted.remove(nextEpoch)
    +          lastCommittedEpoch = nextEpoch
    +          nextEpoch += 1
    +        }
    +
    +        // Cleanup state from before last committed epoch,
    +        // now that we know all partitions are forever past it.
    +        for (k <- partitionCommits.keys.filter { case (e, _) => e < 
lastCommittedEpoch }) {
    +          partitionCommits.remove(k)
    +        }
    +        for (k <- partitionOffsets.keys.filter { case (e, _) => e < 
lastCommittedEpoch }) {
    +          partitionOffsets.remove(k)
    +        }
           }
         }
       }
     
    +  private def findCommitsForEpoch(epoch: Long): 
Iterable[WriterCommitMessage] = {
    --- End diff --
    
    Add docs explaining what this does? As is, its hard to distinguish just 
from the name the difference between `findCommitsForEpoch` and `commitEpoch`. I 
think the term "commit" is overloaded here - `commit` in `findCommitsForEpoch` 
refers to per-partition commits, whereas `commit` in `commitEpoch` refers to 
committing the epoch to the offset log. May be its better to differentiate more 
clearly. `commitEpoch` and `findPartitionCommitsForEpoch`. And add docs to both 
methods also helps.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to