Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185357003
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
    +import org.apache.spark.sql.sources.v2.reader._
    +import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
    + * to read from the remote source, and polls that queue for incoming rows.
    + *
    + * Note that continuous processing calls compute() multiple times, and the 
same
    + * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
    + */
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  // When computing the same partition multiple times, we need to use the 
same data reader to
    +  // do so for continuity in offsets.
    +  @GuardedBy("dataReaders")
    +  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
    --- End diff --
    
    Does this whole reader map need to be serialized for every task? because as 
it is now, this whole this going to be serialized for every task. Per-partition 
objects like this should be passed through the RDDPartition object. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to