[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20552 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167126862 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.stop() } } + --- End diff -- Good instinct, it didn't quite work. Added the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167126838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) +extends DataWriter[InternalRow] { + private val initialEpochId: Long = { +// Start with the microbatch ID. If it's not there, we're in continuous execution, +// so get the start epoch. +// This ID will be incremented as commits happen. +TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match { + case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + case batch => batch.toLong +} +
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167120763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- actually.. probably should not inline this. its outer closure may not be serializable in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167120724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( --- End diff -- actually.. probably should not inline this. its outer closure may not be serializable in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167081693 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf query.stop() } } + --- End diff -- I think there should be a test with continuous processing + foreach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167077542 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( --- End diff -- nit: This is really a small class. Maybe inline this rather than define a confusing name`...InternalWriter` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167076621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- nit: params on different lines --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167078037 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) --- End diff -- similarly ... maybe inline this class as well. its very small. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167080181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) +extends DataWriter[InternalRow] { + private val initialEpochId: Long = { +// Start with the microbatch ID. If it's not there, we're in continuous execution, +// so get the start epoch. +// This ID will be incremented as commits happen. +TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match { + case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + case batch => batch.toLong +} + }
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167080661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( +private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) --- End diff -- params in separate lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20552#discussion_r167078671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala --- @@ -17,52 +17,119 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} + import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter} -import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType -/** - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by - * [[ForeachWriter]]. - * - * @param writer The [[ForeachWriter]] to process all data. - * @tparam T The expected type of the sink. - */ -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable { - - override def addBatch(batchId: Long, data: DataFrame): Unit = { -// This logic should've been as simple as: -// ``` -// data.as[T].foreachPartition { iter => ... } -// ``` -// -// Unfortunately, doing that would just break the incremental planing. The reason is, -// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will -// create a new plan. Because StreamExecution uses the existing plan to collect metrics and -// update watermark, we should never create a new plan. Otherwise, metrics and watermark are -// updated in the new plan, and StreamExecution cannot retrieval them. -// -// Hence, we need to manually convert internal rows to objects using encoder. + +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport { + override def createStreamWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceOptions): StreamWriter = { val encoder = encoderFor[T].resolveAndBind( - data.logicalPlan.output, - data.sparkSession.sessionState.analyzer) -data.queryExecution.toRdd.foreachPartition { iter => - if (writer.open(TaskContext.getPartitionId(), batchId)) { -try { - while (iter.hasNext) { -writer.process(encoder.fromRow(iter.next())) - } -} catch { - case e: Throwable => -writer.close(e) -throw e -} -writer.close(null) - } else { -writer.close(null) + schema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) +ForeachInternalWriter(writer, encoder) + } +} + +case class ForeachInternalWriter[T: Encoder]( +writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends StreamWriter with SupportsWriteInternalRow { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = { +ForeachWriterFactory(writer, encoder) + } +} + +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) +extends DataWriterFactory[InternalRow] { + override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = { +new ForeachDataWriter(writer, encoder, partitionId) + } +} + +class ForeachDataWriter[T : Encoder]( --- End diff -- add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/20552 [SPARK-23099][SS] Migrate foreach sink to DataSourceV2 ## What changes were proposed in this pull request? Migrate the foreach sink to the DataSourceV2 API. ## How was this patch tested? existing unit tests, and new test to verify edge case You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark foreach-sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20552.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20552 commit 44de1ea878fb65e4e04ac6cd594f2e7c72ea2d5e Author: Jose TorresDate: 2018-02-08T20:28:03Z migrate foreach sink --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org