[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13342


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66562319
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -232,7 +234,7 @@ final class DataFrameWriter private[sql](df: DataFrame) 
{
* @since 1.4.0
*/
   @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataFrameWriter = {
+  def partitionBy(colNames: String*): this.type = {
--- End diff --

Forgot to change it. No difference in byte codes but it will show 
`DataFrameWriter.this.type` in scala doc. I will update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66558301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -232,7 +234,7 @@ final class DataFrameWriter private[sql](df: DataFrame) 
{
* @since 1.4.0
*/
   @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataFrameWriter = {
+  def partitionBy(colNames: String*): this.type = {
--- End diff --

sorry i didnt notice this earlier but why is this suddenly `this.type` and 
not `DataFrameWriter[T]` like other methods. Alternatively, why are we using 
`this.type` everywhere?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66552040
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A class to consume data generated by a [[ContinuousQuery]]. Typically 
this is used to send the
+ * generated data to external systems. Each partition will use a new 
deserialized instance, so you
+ * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
+ * in the open method
--- End diff --

nit: missing period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66551851
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -68,9 +71,9 @@ import org.apache.spark.sql.streaming.ContinuousQuery
 abstract class ForeachWriter[T] extends Serializable {
 
   /**
-   * Called when starting to process one partition of new data in the 
executor side. `version` is
-   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
-   * it's guarantee that they will be opened with the same "version".
+   * Called when starting to process one partition of new data in the 
executor. The `version` is
+   * for data deduplication when there are failures. When recovering from 
a failure, some data may
+   * be generated multiple times but they will always have the same 
version.
*
* If this method finds this is a partition from a duplicated data set, 
it can return `false` to
--- End diff --

nit: finds using the `partitionId` and `version` that this partition has 
already been processed, it can return ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66545305
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 ---
@@ -572,4 +572,25 @@ class DataFrameReaderWriterSuite extends StreamTest 
with BeforeAndAfter {
 
 cq.awaitTermination(2000L)
   }
+
+  test("foreach") {
+import testImplicits._
+
+val ds = spark.read
+  .format("org.apache.spark.sql.streaming.test")
+  .stream()
+  .as[Int]
+
+val cq = ds.write
+  .format("console")
+  .option("checkpointLocation", newMetadataDir)
+  .trigger(ProcessingTime(2.seconds))
+  .foreach(new ForeachWriter[Int] {
+override def open(partitionId: Long, version: Long): Boolean = true
+override def process(value: Int): Unit = {}
+override def close(errorOrNull: Throwable): Unit = {}
+  })
+
+cq.awaitTermination(2000L)
--- End diff --

I'm going to remove this one as it should be covered in ForeachSinkSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66543331
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
 ---
@@ -572,4 +572,25 @@ class DataFrameReaderWriterSuite extends StreamTest 
with BeforeAndAfter {
 
 cq.awaitTermination(2000L)
   }
+
+  test("foreach") {
+import testImplicits._
+
+val ds = spark.read
+  .format("org.apache.spark.sql.streaming.test")
+  .stream()
+  .as[Int]
+
+val cq = ds.write
+  .format("console")
+  .option("checkpointLocation", newMetadataDir)
+  .trigger(ProcessingTime(2.seconds))
+  .foreach(new ForeachWriter[Int] {
+override def open(partitionId: Long, version: Long): Boolean = true
+override def process(value: Int): Unit = {}
+override def close(errorOrNull: Throwable): Unit = {}
+  })
+
+cq.awaitTermination(2000L)
--- End diff --

what does this test? whether it started correctly? Should it not set a 
global variable to make sure that this actually works rather than implicitly 
depending on behavior of awaitTermination?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66543397
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -238,7 +238,8 @@ class BucketedReadSuite extends QueryTest with 
SQLTestUtils with TestHiveSinglet
   shuffleLeft: Boolean,
   shuffleRight: Boolean): Unit = {
 withTable("bucketed_table1", "bucketed_table2") {
-  def withBucket(writer: DataFrameWriter, bucketSpec: 
Option[BucketSpec]): DataFrameWriter = {
+  def withBucket(writer: DataFrameWriter[Row], bucketSpec: 
Option[BucketSpec]):
+DataFrameWriter[Row] = {
--- End diff --

isnt the better syntax 
```
def withBucket(
   writer
   bucketSpec): DataFrameWriter[Row] = {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66541395
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +383,91 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can 
be used to send the data
+   * generated by the [[DataFrame]]/[[Dataset]] to an external system. The 
returned The returned
+   * [[ContinuousQuery]] object can be used to interact with the stream.
+   *
+   * Scala example:
+   * {{{
+   *   datasetOfString.write.foreach(new ForeachWriter[String] {
+   * def open(partitionId: Long, version: Long): Boolean = {
+  *// open connection
+   * }
+   * def process(record: String) = {
+  *// write string to connection
+   * }
+   * def close(errorOrNull: Throwable): Unit = {
--- End diff --

check indents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66541375
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +383,91 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can 
be used to send the data
+   * generated by the [[DataFrame]]/[[Dataset]] to an external system. The 
returned The returned
+   * [[ContinuousQuery]] object can be used to interact with the stream.
+   *
+   * Scala example:
+   * {{{
+   *   datasetOfString.write.foreach(new ForeachWriter[String] {
+   * def open(partitionId: Long, version: Long): Boolean = {
+  *// open connection
+   * }
+   * def process(record: String) = {
--- End diff --

add blank line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66541245
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ForeachSinkSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  test("foreach") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
+  val input = MemoryStream[Int]
+  val query = input.toDS().repartition(2).write
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.foreach(new ForeachWriter[Int] {
+
+  private val events = 
mutable.ArrayBuffer[ForeachWriterEvent.Event]()
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+events += ForeachWriterEvent.Open(partition = partitionId, 
version = version)
+true
+  }
+
+  override def process(value: Int): Unit = {
+events += ForeachWriterEvent.Process(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+events += ForeachWriterEvent.Close(error = Option(errorOrNull))
+ForeachWriterEvent.addEvents(events)
+  }
+})
+  input.addData(1, 2, 3, 4)
+  query.processAllAvailable()
+
+  val expectedEventsForPartition0 = Seq(
+ForeachWriterEvent.Open(partition = 0, version = 0),
+ForeachWriterEvent.Process(value = 1),
+ForeachWriterEvent.Process(value = 3),
+ForeachWriterEvent.Close(None)
+  )
+  val expectedEventsForPartition1 = Seq(
+ForeachWriterEvent.Open(partition = 1, version = 0),
+ForeachWriterEvent.Process(value = 2),
+ForeachWriterEvent.Process(value = 4),
+ForeachWriterEvent.Close(None)
+  )
+
+  val allEvents = ForeachWriterEvent.allEvents()
+  assert(allEvents.size === 2)
+  assert {
+allEvents === Seq(expectedEventsForPartition0, 
expectedEventsForPartition1) ||
+  allEvents === Seq(expectedEventsForPartition1, 
expectedEventsForPartition0)
+  }
+  query.stop()
+}
+  }
+
+  test("foreach error") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
+  val input = MemoryStream[Int]
+  val query = input.toDS().repartition(1).write
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.foreach(new ForeachWriter[Int] {
+
+  private val events = 
mutable.ArrayBuffer[ForeachWriterEvent.Event]()
+
+  private var currentPartitionId = -1L
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+currentPartitionId = partitionId
+events += ForeachWriterEvent.Open(partition = partitionId, 
version = version)
+true
+  }
+
+  override def process(value: Int): Unit = {
+events += ForeachWriterEvent.Process(value)
+throw new RuntimeException("error")
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+events += ForeachWriterEvent.Close(error = Option(errorOrNull))
+ForeachWriterEvent.addEvents(events)
+  }
+})
+  input.addData(1, 2, 3, 4)
+  query.processAllAvailable()
+
+  val allEvents = ForeachWriterEvent.allEvents()
+  assert(allEvents.size === 1)
+  

[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66541219
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ForeachSinkSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  test("foreach") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
+  val input = MemoryStream[Int]
+  val query = input.toDS().repartition(2).write
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.foreach(new ForeachWriter[Int] {
+
+  private val events = 
mutable.ArrayBuffer[ForeachWriterEvent.Event]()
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+events += ForeachWriterEvent.Open(partition = partitionId, 
version = version)
+true
+  }
+
+  override def process(value: Int): Unit = {
+events += ForeachWriterEvent.Process(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+events += ForeachWriterEvent.Close(error = Option(errorOrNull))
+ForeachWriterEvent.addEvents(events)
+  }
+})
+  input.addData(1, 2, 3, 4)
+  query.processAllAvailable()
+
+  val expectedEventsForPartition0 = Seq(
+ForeachWriterEvent.Open(partition = 0, version = 0),
+ForeachWriterEvent.Process(value = 1),
+ForeachWriterEvent.Process(value = 3),
+ForeachWriterEvent.Close(None)
+  )
+  val expectedEventsForPartition1 = Seq(
+ForeachWriterEvent.Open(partition = 1, version = 0),
+ForeachWriterEvent.Process(value = 2),
+ForeachWriterEvent.Process(value = 4),
+ForeachWriterEvent.Close(None)
+  )
+
+  val allEvents = ForeachWriterEvent.allEvents()
+  assert(allEvents.size === 2)
+  assert {
+allEvents === Seq(expectedEventsForPartition0, 
expectedEventsForPartition1) ||
+  allEvents === Seq(expectedEventsForPartition1, 
expectedEventsForPartition0)
+  }
+  query.stop()
+}
+  }
+
+  test("foreach error") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
+  val input = MemoryStream[Int]
+  val query = input.toDS().repartition(1).write
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.foreach(new ForeachWriter[Int] {
+
+  private val events = 
mutable.ArrayBuffer[ForeachWriterEvent.Event]()
+
+  private var currentPartitionId = -1L
--- End diff --

Not used anywhere. 
Cant this writer be deduped?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r6654
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ForeachSinkSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  test("foreach") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
+  val input = MemoryStream[Int]
+  val query = input.toDS().repartition(2).write
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.foreach(new ForeachWriter[Int] {
+
+  private val events = 
mutable.ArrayBuffer[ForeachWriterEvent.Event]()
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+events += ForeachWriterEvent.Open(partition = partitionId, 
version = version)
+true
+  }
+
+  override def process(value: Int): Unit = {
+events += ForeachWriterEvent.Process(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+events += ForeachWriterEvent.Close(error = Option(errorOrNull))
+ForeachWriterEvent.addEvents(events)
+  }
+})
+  input.addData(1, 2, 3, 4)
+  query.processAllAvailable()
+
+  val expectedEventsForPartition0 = Seq(
+ForeachWriterEvent.Open(partition = 0, version = 0),
+ForeachWriterEvent.Process(value = 1),
+ForeachWriterEvent.Process(value = 3),
+ForeachWriterEvent.Close(None)
+  )
+  val expectedEventsForPartition1 = Seq(
+ForeachWriterEvent.Open(partition = 1, version = 0),
+ForeachWriterEvent.Process(value = 2),
+ForeachWriterEvent.Process(value = 4),
+ForeachWriterEvent.Close(None)
+  )
+
+  val allEvents = ForeachWriterEvent.allEvents()
+  assert(allEvents.size === 2)
+  assert {
+allEvents === Seq(expectedEventsForPartition0, 
expectedEventsForPartition1) ||
+  allEvents === Seq(expectedEventsForPartition1, 
expectedEventsForPartition0)
+  }
+  query.stop()
+}
+  }
+
+  test("foreach error") {
--- End diff --

nit: foreach with error


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66541011
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ForeachSinkSuite extends StreamTest with SharedSQLContext {
+
+  import testImplicits._
+
+  test("foreach") {
+ForeachWriterEvent.clear()
+withTempDir { checkpointDir =>
--- End diff --

test failures will not stop the query. might lead to cascading failures. 
adding a `after  {  ... } ` to stop all queries may be a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66540602
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
--- End diff --

for data deduplication **when there are failures.**

When recovering from a failure, some data may be **generated multiple 
times** but they will always have the same version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66539666
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
+   *
+   * If this method finds this is a partition from a duplicated data set, 
it can return `false` to
+   * skip the further data processing. However, `close` still will be 
called for cleaning up
+   * resources.
+   *
+   * @param partitionId the partition id.
+   * @param version a unique id for data deduplication.
+   * @return a flat that indicates if the data should be processed.
+   */
+  def open(partitionId: Long, version: Long): Boolean
+
+  /**
+   * Called to process the data in the executor side.
+   */
+  def process(value: T): Unit
+
+  /**
+   * Called when stopping to process one partition of new data in the 
executor side. This is
+   * guaranteed to be called when a `Throwable` is thrown during 
processing data. However,
+   * `close` won't be called in the following cases:
+   *  - JVM crashes without throwing a `Throwable`
+   *  - `open` throws a `Throwable`.
+   *
+   * @param errorOrNull the error thrown during processing data or null if 
nothing is thrown.
--- End diff --

> This method will be called only if open has returned true.

We should call `false` even if `open` returns `false`. The user may open 
some connections to check the existing data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66538640
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
--- End diff --

in the executor ~side~.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66538562
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+
+class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
--- End diff --

Add some scala docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66537635
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
+   *
+   * If this method finds this is a partition from a duplicated data set, 
it can return `false` to
+   * skip the further data processing. However, `close` still will be 
called for cleaning up
+   * resources.
+   *
+   * @param partitionId the partition id.
+   * @param version a unique id for data deduplication.
+   * @return a flat that indicates if the data should be processed.
+   */
+  def open(partitionId: Long, version: Long): Boolean
+
+  /**
+   * Called to process the data in the executor side.
--- End diff --

Also say, 
This method will be called only when open returns true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66537560
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
+   *
+   * If this method finds this is a partition from a duplicated data set, 
it can return `false` to
+   * skip the further data processing. However, `close` still will be 
called for cleaning up
+   * resources.
+   *
+   * @param partitionId the partition id.
+   * @param version a unique id for data deduplication.
+   * @return a flat that indicates if the data should be processed.
+   */
+  def open(partitionId: Long, version: Long): Boolean
+
+  /**
+   * Called to process the data in the executor side.
+   */
+  def process(value: T): Unit
+
+  /**
+   * Called when stopping to process one partition of new data in the 
executor side. This is
+   * guaranteed to be called when a `Throwable` is thrown during 
processing data. However,
+   * `close` won't be called in the following cases:
+   *  - JVM crashes without throwing a `Throwable`
+   *  - `open` throws a `Throwable`.
+   *
+   * @param errorOrNull the error thrown during processing data or null if 
nothing is thrown.
--- End diff --

if nothing is thrown --> if there was no error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66537102
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed("foreach")
+assertStreaming(
+  "foreach() on streaming Datasets and DataFrames can only be called 
on continuous queries")
+
+val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
+val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
+df.sparkSession.sessionState.continuousQueryManager.startQuery(
+  queryName,
+  getCheckpointLocation(queryName, required = false),
+  df,
+  sink,
+  outputMode,
+  trigger)
+  }
+
+  /**
+   * Returns the checkpointLocation for a query. If `required` is `true` 
but the checkpoint
+   * location is not set, [[AnalysisException]] will be thrown. If 
`required` is `false`, a temp
+   * folder will be created if the checkpoint location is not set.
+   */
+  private def getCheckpointLocation(queryName: String, required: Boolean): 
String = {
+extraOptions.get("checkpointLocation").map { userSpecified =>
+  new Path(userSpecified).toUri.toString
+}.orElse {
+  df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location 
=>
+new Path(location, queryName).toUri.toString
+  }
+}.getOrElse {
+  if (required) {
+throw new AnalysisException("checkpointLocation must be specified 
either " +
+  "through option() or SQLConf")
--- End diff --

SQLConf is actually not a user facing concept.

Just say `SparkSession.conf.set`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66534481
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
+   *
+   * If this method finds this is a partition from a duplicated data set, 
it can return `false` to
+   * skip the further data processing. However, `close` still will be 
called for cleaning up
+   * resources.
+   *
+   * @param partitionId the partition id.
+   * @param version a unique id for data deduplication.
+   * @return a flat that indicates if the data should be processed.
--- End diff --

return true if the corresponding partition and version id should be 
processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66533974
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
--- End diff --

Also can you add a simple example code showing the use of ForeachWriter.
```
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(): Boolean = {   // open connection }
def process(record: String) = {   // write string to connection }
def close(errorOrNull: Throwable): Unit = { // close the connection } 
} 
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66533645
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
--- End diff --

...as new data arrives. The [[ForeachWriter]] can be used to send the data 
generated by the DataFrame/Dataset to an external system. The returned...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66533244
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed("foreach")
+assertStreaming(
+  "foreach() on streaming Datasets and DataFrames can only be called 
on continuous queries")
--- End diff --

foreach() can only be called on streaming Datasets/DataFrames.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66532833
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
--- End diff --

A writer to **write** data generated 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66532767
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.streaming.ContinuousQuery
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
--- End diff --

usually should --> should do all the initialization (e.g. opening a 
connection or initiating a transaction) in the `open` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66532405
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed("foreach")
+assertStreaming(
+  "foreach() on streaming Datasets and DataFrames can only be called 
on continuous queries")
+
+val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
+val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
+df.sparkSession.sessionState.continuousQueryManager.startQuery(
+  queryName,
+  getCheckpointLocation(queryName, required = false),
+  df,
+  sink,
+  outputMode,
+  trigger)
+  }
+
+  /**
+   * Returns the checkpointLocation for a query. If `required` is `true` 
but the checkpoint
+   * location is not set, [[AnalysisException]] will be thrown. If 
`required` is `false`, a temp
+   * folder will be created if the checkpoint location is not set.
+   */
+  private def getCheckpointLocation(queryName: String, required: Boolean): 
String = {
--- End diff --

The semantics of this method is very confusing. `required` implies that it 
will throw error if there is not checkpoint location set. Its not intuitive 
that when it is not required it creates a temp checkpoint dir. Furthermore it 
creates temp one named on memory. Does not make sense. 

Cleaner for this to return an Option[String] and `required` --> 
`failIfNotSet'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r66532056
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed("foreach")
+assertStreaming(
+  "foreach() on streaming Datasets and DataFrames can only be called 
on continuous queries")
+
+val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
+val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
+df.sparkSession.sessionState.continuousQueryManager.startQuery(
+  queryName,
+  getCheckpointLocation(queryName, required = false),
+  df,
+  sink,
+  outputMode,
+  trigger)
+  }
+
+  /**
+   * Returns the checkpointLocation for a query. If `required` is `true` 
but the checkpoint
+   * location is not set, [[AnalysisException]] will be thrown. If 
`required` is `false`, a temp
+   * folder will be created if the checkpoint location is not set.
+   */
+  private def getCheckpointLocation(queryName: String, required: Boolean): 
String = {
+extraOptions.get("checkpointLocation").map { userSpecified =>
+  new Path(userSpecified).toUri.toString
+}.orElse {
+  df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location 
=>
+new Path(location, queryName).toUri.toString
+  }
+}.getOrElse {
+  if (required) {
+throw new AnalysisException("checkpointLocation must be specified 
either " +
+  "through option() or SQLConf")
--- End diff --

`option()` --> `option("checkpointLocation", ...)`
`SQLConf` --> `sqlContext.conf..` (complete it)
Makes it easier for the user



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r65981082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,52 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed()
+assertStreaming("startStream() can only be called on continuous 
queries")
--- End diff --

startStream() --> foreach() on streaming datasets and dataframes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-01 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r65412780
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -401,6 +381,52 @@ final class DataFrameWriter private[sql](df: 
DataFrame) {
   }
 
   /**
+   * :: Experimental ::
+   * Starts the execution of the streaming query, which will continually 
send results to the given
+   * [[ForeachWriter]] as new data arrives. The returned 
[[ContinuousQuery]] object can be used to
+   * interact with the stream.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
+assertNotBucketed()
+assertStreaming("startStream() can only be called on continuous 
queries")
+
+val queryName = extraOptions.getOrElse("queryName", 
StreamExecution.nextName)
+val sink = new 
ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.unresolvedTEncoder)
+df.sparkSession.sessionState.continuousQueryManager.startQuery(
+  queryName,
+  getCheckpointLocation(queryName, required = false),
+  df,
+  sink,
+  outputMode,
+  trigger)
+  }
+
+  /**
+   * Returns the checkpointLocation for a query. If `required` is `ture` 
but the checkpoint
--- End diff --

`true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

2016-06-01 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13342#discussion_r65412579
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * A writer to consume data generated by a [[ContinuousQuery]]. Each 
partition will use a new
+ * deserialized instance, so you usually should do the initialization work 
in the `open` method.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+trait ForeachWriter[T] extends Serializable {
+
+  /**
+   * Called when starting to process one partition of new data in the 
executor side. `version` is
+   * for data deduplication. When recovering from a failure, some data may 
be processed twice. But
+   * it's guarantee that they will be opened with the same "version".
+   *
+   * If this method finds this is a partition from a duplicated data set, 
it can return `false` to
+   * skip the further data processing. However, `close` still will be 
called for cleaning up
+   * resources.
+   *
+   * @param partitionId the partition id.
+   * @param version a unique id for data deduplication.
+   * @return a flat that indicates if the data should be processed.
+   */
+  def open(partitionId: Long, version: Long): Boolean
+
+  /**
+   * Called to process the data in the executor side.
+   */
+  def process(value: T): Unit
+
+  /**
+   * Called when stopping to process one partition of new data in the 
executor side.
--- End diff --

We should be clear here and above that there are several cases where this 
method will not be called (i.e. the stream dying, the jvm dying, etc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org