[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r167393259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala --- @@ -139,15 +139,15 @@ class RateStreamMicroBatchReader(options: DataSourceV2Options) outTimeMs += msPerPartitionBetweenRows } - RateStreamBatchTask(packedRows).asInstanceOf[ReadTask[Row]] + RateStreamBatchTask(packedRows).asInstanceOf[DataReaderFactory[Row]] }.toList.asJava } override def commit(end: Offset): Unit = {} override def stop(): Unit = {} } -case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] { +case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends DataReaderFactory[Row] { --- End diff -- and should we rename `RateStreamBatchTask` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r167392663 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala --- @@ -78,7 +78,7 @@ class RateSourceV2Suite extends StreamTest { val reader = new RateStreamMicroBatchReader( new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) reader.setOffsetRange(Optional.empty(), Optional.empty()) -val tasks = reader.createReadTasks() +val tasks = reader.createDataReaderFactories() --- End diff -- nit: seems we should rename the variable too .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20397 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164425992 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java --- @@ -22,21 +22,23 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for - * creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader} + * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is + * responsible for creating the actual data reader. The relationship between + * {@link DataReaderFactory} and {@link DataReader} * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. * - * Note that, the read task will be serialized and sent to executors, then the data reader will be - * created on executors and do the actual reading. So {@link ReadTask} must be serializable and - * {@link DataReader} doesn't need to be. + * Note that, the reader factory will be serialized and sent to executors, then the data reader + * will be created on executors and do the actual reading. So {@link DataReaderFactory} must be + * serializable and {@link DataReader} doesn't need to be. */ @InterfaceStability.Evolving -public interface ReadTask extends Serializable { +public interface DataReaderFactory extends Serializable { /** - * The preferred locations where this read task can run faster, but Spark does not guarantee that - * this task will always run on these locations. The implementations should make sure that it can - * be run on any location. The location is a string representing the host name. + * The preferred locations where this data reader returned by this reader factory can run faster, + * but Spark does not guarantee that this task will always run on these locations. --- End diff -- `not guarantee to always run the data reader on these locations.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164425827 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java --- @@ -22,21 +22,23 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for - * creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader} + * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is + * responsible for creating the actual data reader. The relationship between + * {@link DataReaderFactory} and {@link DataReader} * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. * - * Note that, the read task will be serialized and sent to executors, then the data reader will be - * created on executors and do the actual reading. So {@link ReadTask} must be serializable and - * {@link DataReader} doesn't need to be. + * Note that, the reader factory will be serialized and sent to executors, then the data reader + * will be created on executors and do the actual reading. So {@link DataReaderFactory} must be + * serializable and {@link DataReader} doesn't need to be. */ @InterfaceStability.Evolving -public interface ReadTask extends Serializable { +public interface DataReaderFactory extends Serializable { /** - * The preferred locations where this read task can run faster, but Spark does not guarantee that - * this task will always run on these locations. The implementations should make sure that it can - * be run on any location. The location is a string representing the host name. + * The preferred locations where this data reader returned by this reader factory can run faster, --- End diff -- `this data reader` -> `the data reader` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164425194 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- We mentioned it in the classdoc of `DataReaderFactory` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164349078 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java --- @@ -21,9 +21,9 @@ /** * An interface to represent data distribution requirement, which specifies how the records should - * be distributed among the {@link ReadTask}s that are returned by - * {@link DataSourceV2Reader#createReadTasks()}. Note that this interface has nothing to do with - * the data ordering inside one partition(the output records of a single {@link ReadTask}). + * be distributed among the {@link DataReaderFactory}s that are returned by --- End diff -- `distributed among the data partition(one DataReader outputs data for one partition).` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164349108 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java --- @@ -21,9 +21,9 @@ /** * An interface to represent data distribution requirement, which specifies how the records should - * be distributed among the {@link ReadTask}s that are returned by - * {@link DataSourceV2Reader#createReadTasks()}. Note that this interface has nothing to do with - * the data ordering inside one partition(the output records of a single {@link ReadTask}). + * be distributed among the {@link DataReaderFactory}s that are returned by + * {@link DataSourceV2Reader#createDataReaderFactories()}. Note that this interface has nothing to do with + * the data ordering inside one partition(the output records of a single {@link DataReaderFactory}). --- End diff -- a single DataReader --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164348810 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java --- @@ -63,7 +63,7 @@ StructType readSchema(); /** - * Returns a list of read tasks. Each task is responsible for outputting data for one RDD + * Returns a list of reader factories. Each task is responsible for outputting data for one RDD * partition. That means the number of tasks returned here is same as the number of RDD --- End diff -- `the number of factories ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164348145 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceV2Reader.java --- @@ -63,7 +63,7 @@ StructType readSchema(); /** - * Returns a list of read tasks. Each task is responsible for outputting data for one RDD + * Returns a list of reader factories. Each task is responsible for outputting data for one RDD --- End diff -- `Each factory is ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164348008 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java --- @@ -50,7 +50,7 @@ } /** - * Returns a data reader to do the actual reading work for this read task. + * Returns a data reader to do the actual reading work for this data reader factory. --- End diff -- we can follow the `DataWriterFactory` and just say `... to do the actual reading work` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164347780 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java --- @@ -22,19 +22,19 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A read task returned by {@link DataSourceV2Reader#createReadTasks()} and is responsible for - * creating the actual data reader. The relationship between {@link ReadTask} and {@link DataReader} + * A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is responsible for + * creating the actual data reader. The relationship between {@link DataReaderFactory} and {@link DataReader} * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}. * - * Note that, the read task will be serialized and sent to executors, then the data reader will be - * created on executors and do the actual reading. So {@link ReadTask} must be serializable and + * Note that, the reader factory will be serialized and sent to executors, then the data reader will be + * created on executors and do the actual reading. So {@link DataReaderFactory} must be serializable and * {@link DataReader} doesn't need to be. */ @InterfaceStability.Evolving -public interface ReadTask extends Serializable { +public interface DataReaderFactory extends Serializable { /** - * The preferred locations where this read task can run faster, but Spark does not guarantee that + * The preferred locations where this data reader factory can run faster, but Spark does not guarantee that --- End diff -- `... where the data reader returned by this reader factory can run faster ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164347650 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- I would also mention this: ``` data reader creation must be done at executor side ``` so it makes it more clear why we shall have a list of `DataReaderFactory`s, but it's only my personal opinions, it's totally fine if we don't include the implementation details in the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164347542 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java --- @@ -22,7 +22,7 @@ /** * A concrete implementation of {@link Distribution}. Represents a distribution where records that * share the same values for the {@link #clusteredColumns} will be produced by the same - * {@link ReadTask}. + * {@link DataReaderFactory}. --- End diff -- actually `DataReader` is more precise here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164345010 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- I think already did, in `DataSourceV2Reader` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164342995 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- I see, make sense. Maybe update the comment to explain why we need a list of `DataReaderFactory`s? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164335994 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- `DataReaderFactory` is responsible to do serialization and initialize the actual data readers, so data reader creation must be done at executor side, and before that we need to determine how many RDD partitions we want, which is this method doing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r164248501 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java --- @@ -30,21 +30,21 @@ @InterfaceStability.Evolving public interface SupportsScanColumnarBatch extends DataSourceV2Reader { @Override - default ListcreateReadTasks() { + default List createDataReaderFactories() { --- End diff -- We shall create only one `DataReaderFactory`, and have that create multiple data readers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r163921087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -22,24 +22,24 @@ import scala.reflect.ClassTag import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.sources.v2.reader.ReadTask +import org.apache.spark.sql.sources.v2.reader.DataReaderFactory -class DataSourceRDDPartition[T : ClassTag](val index: Int, val readTask: ReadTask[T]) +class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T]) extends Partition with Serializable class DataSourceRDD[T: ClassTag]( sc: SparkContext, -@transient private val readTasks: java.util.List[ReadTask[T]]) +@transient private val readerFactories: java.util.List[DataReaderFactory[T]]) extends RDD[T](sc, Nil) { override protected def getPartitions: Array[Partition] = { -readTasks.asScala.zipWithIndex.map { +readerFactories.asScala.zipWithIndex.map { case (readTask, index) => new DataSourceRDDPartition(index, readTask) --- End diff -- readTask -> readerFactory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20397#discussion_r163920956 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -39,14 +39,14 @@ import org.apache.spark.util.{SystemClock, ThreadUtils} class ContinuousDataSourceRDD( sc: SparkContext, sqlContext: SQLContext, -@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) +@transient private val readerFactories: java.util.List[DataReaderFactory[UnsafeRow]]) extends RDD[UnsafeRow](sc, Nil) { private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs override protected def getPartitions: Array[Partition] = { -readTasks.asScala.zipWithIndex.map { +readerFactories.asScala.zipWithIndex.map { case (readTask, index) => new DataSourceRDDPartition(index, readTask) --- End diff -- readTask -> readerFactory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20397: [SPARK-23219][SQL]Rename ReadTask to DataReaderFa...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/20397 [SPARK-23219][SQL]Rename ReadTask to DataReaderFactory in data source v2 ## What changes were proposed in this pull request? Currently we have `ReadTask` in data source v2 reader, while in writer we have `DataWriterFactory`. To make the naming consistent and better, renaming `ReadTask` to `DataReaderFactory`. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark rename Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20397.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20397 commit ce1196e2e600cf6382ce3e721a4189c6324756e5 Author: Wang GengliangDate: 2018-01-25T15:54:47Z rename ReadTask to DataReaderFactory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org