[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-03-07 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20552


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167126862
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   query.stop()
 }
   }
+
--- End diff --

Good instinct, it didn't quite work. Added the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167126838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
+extends DataWriter[InternalRow] {
+  private val initialEpochId: Long = {
+// Start with the microbatch ID. If it's not there, we're in 
continuous execution,
+// so get the start epoch.
+// This ID will be incremented as commits happen.
+TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) 
match {
+  case null => 
TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+  case batch => batch.toLong
+}
+  

[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167120763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
--- End diff --

actually.. probably should not inline this. its outer closure may not be 
serializable in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167120724
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
--- End diff --

actually.. probably should not inline this. its outer closure may not be 
serializable in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167081693
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   query.stop()
 }
   }
+
--- End diff --

I think there should be a test with continuous processing + foreach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167077542
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
--- End diff --

nit: This is really a small class. Maybe inline this rather than define a 
confusing name`...InternalWriter`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167076621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
--- End diff --

nit: params on different lines


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167078037
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
--- End diff --

similarly ... maybe inline this class as well. its very small.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167080181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
+extends DataWriter[InternalRow] {
+  private val initialEpochId: Long = {
+// Start with the microbatch ID. If it's not there, we're in 
continuous execution,
+// so get the start epoch.
+// This ID will be incremented as commits happen.
+TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) 
match {
+  case null => 
TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+  case batch => batch.toLong
+}
+  }

[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167080661
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
+private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], 
partitionId: Int)
--- End diff --

params in separate lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20552#discussion_r167078671
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -17,52 +17,119 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream}
+
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 
-/**
- * A [[Sink]] that forwards all data into [[ForeachWriter]] according to 
the contract defined by
- * [[ForeachWriter]].
- *
- * @param writer The [[ForeachWriter]] to process all data.
- * @tparam T The expected type of the sink.
- */
-class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-// This logic should've been as simple as:
-// ```
-//   data.as[T].foreachPartition { iter => ... }
-// ```
-//
-// Unfortunately, doing that would just break the incremental planing. 
The reason is,
-// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, 
but `Dataset.rdd()` will
-// create a new plan. Because StreamExecution uses the existing plan 
to collect metrics and
-// update watermark, we should never create a new plan. Otherwise, 
metrics and watermark are
-// updated in the new plan, and StreamExecution cannot retrieval them.
-//
-// Hence, we need to manually convert internal rows to objects using 
encoder.
+
+case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) 
extends StreamWriteSupport {
+  override def createStreamWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceOptions): StreamWriter = {
 val encoder = encoderFor[T].resolveAndBind(
-  data.logicalPlan.output,
-  data.sparkSession.sessionState.analyzer)
-data.queryExecution.toRdd.foreachPartition { iter =>
-  if (writer.open(TaskContext.getPartitionId(), batchId)) {
-try {
-  while (iter.hasNext) {
-writer.process(encoder.fromRow(iter.next()))
-  }
-} catch {
-  case e: Throwable =>
-writer.close(e)
-throw e
-}
-writer.close(null)
-  } else {
-writer.close(null)
+  schema.toAttributes,
+  SparkSession.getActiveSession.get.sessionState.analyzer)
+ForeachInternalWriter(writer, encoder)
+  }
+}
+
+case class ForeachInternalWriter[T: Encoder](
+writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
+extends StreamWriter with SupportsWriteInternalRow {
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def createInternalRowWriterFactory(): 
DataWriterFactory[InternalRow] = {
+ForeachWriterFactory(writer, encoder)
+  }
+}
+
+case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], 
encoder: ExpressionEncoder[T])
+extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
ForeachDataWriter[T] = {
+new ForeachDataWriter(writer, encoder, partitionId)
+  }
+}
+
+class ForeachDataWriter[T : Encoder](
--- End diff --

add docs describing the implementation of this DataWriter, especially the 
lifecycle of ForeachWriter (should go here than inline comments).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

2018-02-08 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20552

[SPARK-23099][SS] Migrate foreach sink to DataSourceV2

## What changes were proposed in this pull request?

Migrate the foreach sink to the DataSourceV2 API.

## How was this patch tested?

existing unit tests, and new test to verify edge case


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark foreach-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20552


commit 44de1ea878fb65e4e04ac6cd594f2e7c72ea2d5e
Author: Jose Torres 
Date:   2018-02-08T20:28:03Z

migrate foreach sink




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org