Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185316062
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
    +import org.apache.spark.sql.sources.v2.reader._
    +import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
    + * to read from the remote source, and polls that queue for incoming rows.
    + *
    + * Note that continuous processing calls compute() multiple times, and the 
same
    + * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
    + */
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  // When computing the same partition multiple times, we need to use the 
same data reader to
    +  // do so for continuity in offsets.
    +  @GuardedBy("dataReaders")
    +  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
    +    mutable.Map[Partition, ContinuousQueuedDataReader]()
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readerFactories.zipWithIndex.map {
    +      case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
    +    // If attempt number isn't 0, this is a task retry, which we don't 
support.
    +    if (context.attemptNumber() != 0) {
    +      throw new ContinuousTaskRetryException()
    +    }
    +
    +    val readerForPartition = dataReaders.synchronized {
    +      if (!dataReaders.contains(split)) {
    +        dataReaders.put(
    +          split,
    +          new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
    +      }
    +
    +      dataReaders(split)
    +    }
    +
    +    val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
    +    val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private val POLL_TIMEOUT_MS = 1000
    +
    +      private var currentEntry: (UnsafeRow, PartitionOffset) = _
    +
    +      override def hasNext(): Boolean = {
    +        while (currentEntry == null) {
    +          if (context.isInterrupted() || context.isCompleted()) {
    +            currentEntry = (null, null)
    +          }
    +          if (readerForPartition.dataReaderFailed.get()) {
    +            throw new SparkException(
    +              "data read failed", 
readerForPartition.dataReaderThread.failureReason)
    +          }
    +          if (readerForPartition.epochPollFailed.get()) {
    +            throw new SparkException(
    +              "epoch poll failed", 
readerForPartition.epochPollRunnable.failureReason)
    +          }
    +          currentEntry = readerForPartition.queue.poll(POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)
    +        }
    +
    +        currentEntry match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              readerForPartition.currentEpoch,
    +              readerForPartition.currentOffset))
    +            readerForPartition.currentEpoch += 1
    +            currentEntry = null
    +            false
    --- End diff --
    
    I guess this is not that trivial to discuss in this PR, because it concerns 
about multiple stages which current continuous mode is trying to avoid.
    
    Btw, looks like RDD works just opposite way what another streaming 
frameworks work: downstream requests records from the upstream(s) instead of 
upstream flows records through the downstream(s).
    
    If my understanding is right, there's always a batch here by nature, 
regardless of its size. New ideas like "unbounded RDD" also may not work with 
multiple stages. We might be able to put major changes to the core to handle 
"epoch marker" naturally, but it is going to be another hack which both batch 
and micro-batch don't need to have. Moreover, by nature, RDD.compute() needs to 
be terminated to handle others like state checkpoint as well.
    
    That's the reason I asked about "the goal of continuous mode" to see the 
direction/plan. If the goal is targeted to "streaming", we might eventually end 
up with having another execution model. If the goal is targeted to "heavily 
latency-oriented optimized micro-batch", what we might want to focus is 
allowing tasks to be alive and handle multiple batches. I'm not sure it is 
possible without major changes (sadly not familiar with internal), but once we 
address it, it can be just "micro-batch" mode again, but with major 
improvements.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to