[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21200 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r186123363 --- Diff: sql/core/pom.xml --- @@ -146,6 +146,11 @@ parquet-avro test + + org.mockito + mockito-core + test + --- End diff -- I guess not. My IDE reported that I did. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r186019147 --- Diff: sql/core/pom.xml --- @@ -146,6 +146,11 @@ parquet-avro test + + org.mockito + mockito-core + test + --- End diff -- do you need this? mockito is already present in test scope for spark/core/pom.xml which is inherited by spark/sql/core test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185901243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong --- End diff -- yes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185900415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185897924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = { +readerForPartition.next() match { + // epoch boundary marker + case EpochMarker => +epochEndpoint.send(ReportPartitionOffset( + context.partitionId(), + readerForPartition.currentEpoch, + readerForPartition.currentOffset))
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185667167 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +/** + * The RDD writing to a sink in continuous processing. + * + * Within each task, we repeatedly call prev.compute(). Each resulting iterator contains the data + * to be written for one epoch, which we commit and forward to the driver. + * + * We keep repeating prev.compute() and writing new epochs until the query is shut down. + */ +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) +extends RDD[Unit](prev) { + + override val partitioner = prev.partitioner + + override def getPartitions: Array[Partition] = prev.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + +do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { +try { + val dataIterator = prev.compute(split, context) + dataWriter = writeTask.createDataWriter( +context.partitionId(), context.attemptNumber(), currentEpoch) + while (dataIterator.hasNext) { +dataWriter.write(dataIterator.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch is committing.") + val msg = dataWriter.commit() + epochCoordinator.send( +CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch committed.") + currentEpoch += 1 +} catch { + case _: InterruptedException => + // Continuous shutdown always involves an interrupt. Just finish the task. +} + })(catchBlock = { +// If there is an error, abort this writer. We enter this callback in the middle of +// rethrowing an exception, so compute() will stop executing at this point. +logError(s"Writer for partition ${context.partitionId()} is aborting.") +if (dataWriter != null) dataWriter.abort() +logError(s"Writer for partition ${context.partitionId()} aborted.") + }) +} while (!context.isInterrupted() && !context.isCompleted()) --- End diff -- Why is this `do...while` instead of `while`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185666924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +/** + * The RDD writing to a sink in continuous processing. + * + * Within each task, we repeatedly call prev.compute(). Each resulting iterator contains the data + * to be written for one epoch, which we commit and forward to the driver. + * + * We keep repeating prev.compute() and writing new epochs until the query is shut down. + */ +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) +extends RDD[Unit](prev) { + + override val partitioner = prev.partitioner + + override def getPartitions: Array[Partition] = prev.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + +do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { +try { + val dataIterator = prev.compute(split, context) + dataWriter = writeTask.createDataWriter( +context.partitionId(), context.attemptNumber(), currentEpoch) + while (dataIterator.hasNext) { +dataWriter.write(dataIterator.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch is committing.") + val msg = dataWriter.commit() + epochCoordinator.send( +CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) --- End diff -- nit: move to prev line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185672429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185666101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185672068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185663614 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185672320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185661460 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185666390 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong --- End diff -- is `currentEpoch` is used any where? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185665215 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185670635 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185664066 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185668239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { --- End diff -- this condition can be deduped into a method with a smaller name like "shouldStop`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185671213 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) --- End diff -- epochCoordEndpoint --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185665512 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) --- End diff -- epochCoordEndpoint --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185662912 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset --- End diff -- As I commented above, currentOffset does not need to be exposed at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185662378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = { +readerForPartition.next() match { + // epoch boundary marker + case EpochMarker => +epochEndpoint.send(ReportPartitionOffset( + context.partitionId(), + readerForPartition.currentEpoch, + readerForPartition.currentOffset)) +
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185666055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185663461 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185664956 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185671931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = { +readerForPartition.next() match { + // epoch boundary marker + case EpochMarker => +epochEndpoint.send(ReportPartitionOffset( + context.partitionId(), + readerForPartition.currentEpoch, + readerForPartition.currentOffset)) +
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185668041 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185665877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) --- End diff -- epochPoll -> epochGenerator --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185666240 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) --- End diff -- Are these flags really needed? Cant we simply check `dataReader.failureReason != null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185664585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185671576 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow + // iterator for the current epoch. + // Note that the iterator is also responsible for advancing some fields in the per-task + // reader that need to be shared across epochs. + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = { + val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition] + if (partition.queueReader == null) { +partition.queueReader = + new ContinuousQueuedDataReader( +partition.readerFactory, context, dataQueueSize, epochPollIntervalMs) + } + + partition.queueReader +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = { +readerForPartition.next() match { + // epoch boundary marker + case EpochMarker => +epochEndpoint.send(ReportPartitionOffset( + context.partitionId(), + readerForPartition.currentEpoch, + readerForPartition.currentOffset)) +
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185663440 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185661542 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * The record types in a continuous processing buffer. + */ +sealed trait ContinuousRecord +case object EpochMarker extends ContinuousRecord +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends ContinuousRecord + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * The RDD is responsible for advancing two fields here, since they need to be updated in line + * with the data flow: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + */ +class ContinuousQueuedDataReader( +factory: DataReaderFactory[UnsafeRow], +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = factory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + private val queue = new ArrayBlockingQueue[ContinuousRecord](dataQueueSize) + + private val epochPollFailed = new AtomicBoolean(false) + private val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochMarkerExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + private val epochMarkerGenerator = new EpochMarkerGenerator(queue, context, epochPollFailed) + epochMarkerExecutor.scheduleWithFixedDelay( +epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + private val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { +this.close() + }) + + def next(): ContinuousRecord = { +val POLL_TIMEOUT_MS = 1000 +var currentEntry: ContinuousRecord = null + +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185660517 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.{NextIterator, ThreadUtils} + +class ContinuousDataSourceRDDPartition( +val index: Int, +val readerFactory: DataReaderFactory[UnsafeRow]) + extends Partition with Serializable { + + // This is semantically a lazy val - it's initialized once the first time a call to + // ContinuousDataSourceRDD.compute() needs to access it, so it can be shared across + // all compute() calls for a partition. This ensures that one compute() picks up where the + // previous one ended. + // We don't make it actually a lazy val because it needs input which isn't available here. + // This will only be initialized on the executors. + private[continuous] var queueReader: ContinuousQueuedDataReader = _ +} + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +dataQueueSize: Int, +epochPollIntervalMs: Long, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory) +}.toArray + } + + // Initializes the per-task reader if not already done, and then produces the UnsafeRow --- End diff -- For class docs and method docs we use `/** ... */` See **Code documentation style** in http://spark.apache.org/contributing.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185659743 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( --- End diff -- Its okay for multiple RDDs one extending the other to have different partition types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185602618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( --- End diff -- After another suggestion they now don't share a Partition type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185602445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/DataReaderThread.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.DataReader +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The data component of [[ContinuousQueuedDataReader]]. Pushes (row, offset) to the queue when + * a new row arrives to the [[DataReader]]. + */ +class DataReaderThread( --- End diff -- I think maybe it's better as is now that it's an inner class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185391845 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) +extends RDD[Unit](prev) { + + override val partitioner = prev.partitioner + + override def getPartitions: Array[Partition] = prev.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + +do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { +try { + val dataIterator = prev.compute(split, context) + dataWriter = writeTask.createDataWriter( +context.partitionId(), context.attemptNumber(), currentEpoch) + while (dataIterator.hasNext) { +dataWriter.write(dataIterator.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch is committing.") + val msg = dataWriter.commit() + epochCoordinator.send( +CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch committed.") + currentEpoch += 1 --- End diff -- Both nodes do their own independent tracking of currentEpoch. This is required; eventually they won't always be on the same machine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185387562 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset --- End diff -- The data reader thread doesn't access this. As mentioned in the top-level comment, the task iterator thread is responsible for advancing it as it sees new rows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185387218 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious generalization of this logic to multiple stages won't work. It's +// invalid to send an epoch marker from the bottom of a task if all its child tasks +// haven't sent one. +currentEntry = (null, null)
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185387056 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious generalization of this logic to multiple stages won't work. It's --- End diff -- The bottommost RDD within a task. Really this is just intended as a pointer to not copy this logic blindly as we generalize the execution. I can pare
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185365085 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious generalization of this logic to multiple stages won't work. It's +// invalid to send an epoch marker from the bottom of a task if all its child tasks +// haven't sent one. +currentEntry = (null, null) +
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185371525 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) --- End diff -- add docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185372507 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark.{Partition, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.util.Utils + +class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactory[InternalRow]) +extends RDD[Unit](prev) { + + override val partitioner = prev.partitioner + + override def getPartitions: Array[Partition] = prev.partitions + + override def compute(split: Partition, context: TaskContext): Iterator[Unit] = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + +do { + var dataWriter: DataWriter[InternalRow] = null + // write the data and commit this writer. + Utils.tryWithSafeFinallyAndFailureCallbacks(block = { +try { + val dataIterator = prev.compute(split, context) + dataWriter = writeTask.createDataWriter( +context.partitionId(), context.attemptNumber(), currentEpoch) + while (dataIterator.hasNext) { +dataWriter.write(dataIterator.next()) + } + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch is committing.") + val msg = dataWriter.commit() + epochCoordinator.send( +CommitPartitionEpoch(context.partitionId(), currentEpoch, msg) + ) + logInfo(s"Writer for partition ${context.partitionId()} " + +s"in epoch $currentEpoch committed.") + currentEpoch += 1 --- End diff -- I am having trouble tracking how the currentEpoch is updated and used. Is this field `currentEpoch` used anywhere outside this class? The `ContinuousQueuedDataReader` also has currentEpoch being incremented. I am confused on what is used where. Cant we converge the different flags to a common thread-local variable that is initialized using the local property, incremented at one place (say, by this writer class) and used everywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185351224 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { --- End diff -- I think its a bad idea to have sqlContext in the constructor because this can accidentally serialize the SQLContext, and we dont want that. In fact, if you only need few confs from the SQLContext, then maybe, we should only have those in the constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185364497 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + // This queue contains two types of messages: + // * (null, null) representing an epoch boundary. + // * (row, off) containing a data row and its corresponding PartitionOffset. + val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + + val epochPollFailed = new AtomicBoolean(false) + val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) --- End diff -- Why is this public val? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185365453 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: --- End diff -- This can be much more strongly encapsulated. There is no need to expose `queue`, `epochPollFailed` and `dataReaderFailed`. See comment in the RDD class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185357003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = --- End diff -- Does this whole reader map need to be serialized for every task? because as it is now, this whole this going to be serialized for every task. Per-partition objects like this should be passed through the RDDPartition object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185371024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( --- End diff -- This class is best understood only when you see both `DataReaderThread` and `EpochPollRunnable` code. And these classes share a lot of objects between themselves (flags, taskcontext, etc.). So I think it makes more sense to have the `DataReaderThread` and `EpochPollRunnable` as inner classes of this `ContinuousQueuedDataReader` class. Would make the logic easier to follow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185365888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/DataReaderThread.scala --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.DataReader +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The data component of [[ContinuousQueuedDataReader]]. Pushes (row, offset) to the queue when + * a new row arrives to the [[DataReader]]. + */ +class DataReaderThread( --- End diff -- Rename to ContinuousDataReaderThread. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185356454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious generalization of this logic to multiple stages won't work. It's --- End diff -- It's hard to make sense what this means. What is "bottom of a task"?? --- - To unsubscribe,
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185369491 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( --- End diff -- Its hard to understand what "EpochPollRunnable" means. Rather how about "EpochMarkerGenerator"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185364276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset --- End diff -- How is this synchronized? Isnt this accessed from the task iterator thread and the data reader thread? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185369742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- I strongly suggest adding more docs here to explain this logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185356694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { --- End diff -- Please add docs on what this method does. This is a large method, and breaking it down into smaller internal methods may be beneficial (or at least documenting the sections). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185370511 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + // This queue contains two types of messages: + // * (null, null) representing an epoch boundary. + // * (row, off) containing a data row and its corresponding PartitionOffset. + val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) --- End diff -- Commented above, this does not need to be public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185352237 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( --- End diff -- Why not just extend DataSourceRDD?? That would dedup quite a bit of the code related to `getPartitions` and `preferredLocations`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185358647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +// Force the epoch to end here. The writer will notice the context is interrupted +// or completed and not start a new one. This makes it possible to achieve clean +// shutdown of the streaming query. +// TODO: The obvious generalization of this logic to multiple stages won't work. It's +// invalid to send an epoch marker from the bottom of a task if all its child tasks +// haven't sent one. +currentEntry = (null, null) +
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185353202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { --- End diff -- It might be slightly cleaner to implement this using spark.util.NextIterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185351152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, --- End diff -- I think its a bad idea to have sqlContext in the constructor because this can accidentally serialize the SQLContext, and we dont want that. In fact, if you only need few confs from the SQLContext, then maybe, we should only have those in the constructor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185334746 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r18520 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185329883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185329755 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185328820 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185326551 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185320119 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185317000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- Yeah, that's what I also missed. Thanks for correcting. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185316062 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185288754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- Oh! Yeah, that's definitely not intended. I don't want to fully rearrange, since we should still enable clean shutdown if the data reader or epoch poll threads have been shutdown from the interrupt earlier. But I can fix the logic here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185282844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- I meant current logic still call queue.poll again instead of using assigned epoch marker value, even if it matches the if statement. It looks like unintended, right? We can arrange the logic to fail-fast on exception cases, and if-else to fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185268813 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } -val rdd = query.execute() +val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) +val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + - s"The input RDD has ${rdd.getNumPartitions} partitions.") -// Let the epoch coordinator know how many partitions the write RDD has. + s"The input RDD has ${messages.length} partitions.") EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), -sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - sparkContext.runJob( -rdd, -(context: TaskContext, iter: Iterator[InternalRow]) => - WriteToContinuousDataSourceExec.run(writerFactory, context, iter), -rdd.partitions.indices) + rdd.collect() } catch { case _: InterruptedException => -// Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => +logError(s"Data source writer $writer is aborting.") --- End diff -- (It was in an older version of WriteToContinuousDataSourceExec, and has since been removed.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185268566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } -val rdd = query.execute() +val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) +val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + - s"The input RDD has ${rdd.getNumPartitions} partitions.") -// Let the epoch coordinator know how many partitions the write RDD has. + s"The input RDD has ${messages.length} partitions.") EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), -sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - sparkContext.runJob( -rdd, -(context: TaskContext, iter: Iterator[InternalRow]) => - WriteToContinuousDataSourceExec.run(writerFactory, context, iter), -rdd.partitions.indices) + rdd.collect() } catch { case _: InterruptedException => -// Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => +logError(s"Data source writer $writer is aborting.") --- End diff -- Sorry, I rebased wrong. This change wasn't meant to be here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185266510 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + // This queue contains two types of messages: + // * (null, null) representing an epoch boundary. + // * (row, off) containing a data row and its corresponding PartitionOffset. + val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + + val epochPollFailed = new AtomicBoolean(false) + val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) + epochPollExecutor.scheduleWithFixedDelay( +epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185266275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- This isn't a no-op because it hooks into part of the writer. Added a comment clarifying what's happening. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185265321 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185264079 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- It can also happen if GetCurrentEpoch just takes a long time for some reason. I agree it'd make sense to add a check to ensure trigger interval is greater than executorPollIntervalMs. I'd even argue for some small multiplier on top of that poll interval. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185259515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- yes the queue getting full can be one, I think trigger interval < executorPollIntervalMs could be another. Anyways I guess it would just cause the reader to report the same offsets for multiple epochs which may be ok (but not desirable) since it will cause the epoch coordinator to block the other epochs from committing and commit one after the other when the commit message arrives for the missing partition. Not sure if there are any checks to ensure trigger interval > executorPollIntervalMs. Maybe this this should be added or executorPollIntervalMs should be calculated based on trigger interval. I don't know the flow enough to understand what happens when an executor crashes - how the epoch gets reset and the newly launched tasks continue from the last successful commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185201032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } -val rdd = query.execute() +val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) +val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + - s"The input RDD has ${rdd.getNumPartitions} partitions.") -// Let the epoch coordinator know how many partitions the write RDD has. + s"The input RDD has ${messages.length} partitions.") EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), -sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - sparkContext.runJob( -rdd, -(context: TaskContext, iter: Iterator[InternalRow]) => - WriteToContinuousDataSourceExec.run(writerFactory, context, iter), -rdd.partitions.indices) + rdd.collect() } catch { case _: InterruptedException => -// Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => +logError(s"Data source writer $writer is aborting.") --- End diff -- Could you please explain the needs of additional handling? Since ContinuousWriteRDD is still handling the error case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r18524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- Please correct me if I'm missing. My understanding is that the situation (gap bigger than 1) should only occur when array queue gets full and blocks epoch thread to put marker more than trigger interval. Any other situations (error cases) should just crash the whole query so that recovery happens from the scratch: that's why we can ignore the missing case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185194384 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185198458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + // This queue contains two types of messages: + // * (null, null) representing an epoch boundary. + // * (row, off) containing a data row and its corresponding PartitionOffset. + val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + + val epochPollFailed = new AtomicBoolean(false) + val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) + epochPollExecutor.scheduleWithFixedDelay( +epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { --- End diff -- Maybe better to just call `close` if `this` is visible. --- - To unsubscribe, e-mail:
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185187424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- This line is effectively no-op unless we exit the loop afterwards. So better to clarify the behavior and fix it. I know this code block is just same as of now so it might be out of topic. If we would like to address it from other issue, I'm happy to file an issue and also work on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185153219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- I don't think there's any need to trigger a recovery. The reader can (and currently will) just treat the epochs it missed as empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185153197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185147205 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- Can the diff between "new" and "current" epoch be more than one ? This means the reader missed some epochs and maybe then it should trigger a recovery? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185148972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunnable.failureReason) + } + currentEntry =
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/21200 [SPARK-24039][SS] Do continuous processing writes with multiple compute() calls ## What changes were proposed in this pull request? Do continuous processing writes with multiple compute() calls. The current strategy is hacky; we just call next() on an iterator which has already returned hasNext = false, knowing that all the nodes we whitelist handle this properly. This will not work in the long term. Most of the changes here are just refactoring to accommodate the new model. The functional changes are: * The writer now calls prev.compute(split, context) once per epoch within the epoch loop. * ContinuousDataSourceRDD now spawns a ContinuousQueuedDataReader which is shared across multiple calls to compute() for the same partition. ## How was this patch tested? existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark noAggr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21200 commit 743419298c0a4bf98b4f547c3a6b3c9c86fdfacf Author: Jose TorresDate: 2018-03-16T21:00:40Z partial commit 7c3c3e248c66506c66643e60c2b3e0f4f415e33c Author: Jose Torres Date: 2018-03-26T17:44:19Z rm old path commit 49cd89ebf68af1fc45d642a64a69090b32ee1b19 Author: Jose Torres Date: 2018-03-27T02:15:00Z format + docs commit 23a436f911e7b99dfbb9c18794933cae8c1fe363 Author: Jose Torres Date: 2018-04-18T21:59:23Z use agg commit c9a074fe16d97d23ad8d0e9c64b65ac3718174ec Author: Jose Torres Date: 2018-03-27T02:16:28Z rename node commit ec0e68df89cc183e661327c7894390be395b93ef Author: Jose Torres Date: 2018-03-30T00:59:36Z remove inheritance altogether commit 7a4f1e72a3a139fee7980c54f312f30d8f738c04 Author: Jose Torres Date: 2018-04-19T18:52:59Z rvrt stream writer commit 3a4991aa3345d6c5b088586b388269878d7667d3 Author: Jose Torres Date: 2018-04-19T18:55:50Z partial no rdd commit 59710f6961040381344a4a8b297e061d275c4a83 Author: Jose Torres Date: 2018-04-19T18:57:34Z include rdd commit 6426185059b4c5ac526f2da5fc40a6b8433638ae Author: Jose Torres Date: 2018-04-19T18:59:32Z without shared task commit 0c061f3e41f751bf78af1501b1c2764460ee9d7d Author: Jose Torres Date: 2018-04-19T19:47:09Z working without restart commit 90049f962f60b5702afca31f723a1e0b2b06d094 Author: Jose Torres Date: 2018-04-19T19:47:27Z include new file commit 7463ac32ffa030e68cd0e5bdcba16c9b90687822 Author: Jose Torres Date: 2018-04-20T18:04:41Z fix restarts commit ccd2b380316b5bc6e073b448f49710d5bf2277ea Author: Jose Torres Date: 2018-04-30T16:47:54Z remove aggregate changes commit 38498e3c9473b7ec90ea10c5edada60ee2a69769 Author: Jose Torres Date: 2018-04-30T21:56:16Z cleanup naming and use map commit ca545e490e11a69d8329f5f7605590339d419991 Author: Jose Torres Date: 2018-04-30T22:06:07Z add docs commit aee0cda5a0554015a28d48c9e6db756d53b8aa5f Author: Jose Torres Date: 2018-04-30T22:08:03Z remove unused class commit d8f90b1b03cc9eed1ebcec992baaf0006e34ca94 Author: Jose Torres Date: 2018-04-30T22:11:56Z split out EpochPollRunnable commit 4f9f16142afdf75edc2a8cbaebe36546305aa832 Author: Jose Torres Date: 2018-04-30T22:17:12Z split data reader thread and fix file name commit 54c0bf1b65e22157166e2159cf886b912a07828e Author: Jose Torres Date: 2018-04-30T22:30:44Z fix imports commit 373826e129c522575df5d2c26c7ec56cca218c40 Author: Jose Torres Date: 2018-04-30T22:35:21Z add ContinuousDataSourceRDD docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org