[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21116


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21116#discussion_r183489854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing 
[[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: 
SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer. " +
+  s"The input RDD has ${messages.length} partitions.")
+// Let the epoch coordinator know how many partitions the write RDD 
has.
+EpochCoordinatorRef.get(
+
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+sparkContext.env)
+  .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+try {
+  // Force the RDD to run so continuous processing starts; no data is 
actually being collected
+  // to the driver, as ContinuousWriteRDD outputs nothing.
+  sparkContext.runJob(
+rdd,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
+rdd.partitions.indices)
+} catch {
+  case _: InterruptedException =>
+// Interruption is how continuous queries are ended, so accept and 
ignore the exception.
+  case cause: Throwable =>
+cause match {
+  // Do not wrap interruption exceptions that will be handled by 
streaming specially.
+  case _ if StreamExecution.isInterruptionException(cause) => 
throw cause
+  // Only wrap non fatal exceptions.
+  case NonFatal(e) => throw new SparkException("Writing job 
aborted.", e)
+  case _ => throw cause
+}
+}
+
+sparkContext.emptyRDD
+  }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+  def run(
+  writeTask: DataWriterFactory[InternalRow],
+  context: TaskContext,
+  iter: Iterator[InternalRow]): Unit = {
+val epochCoordinator = EpochCoordinatorRef.get(
+  
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  SparkEnv.get)
+val currentMsg: WriterCommitMessage = null
--- End diff --

Nvm. I see the msg being sent back using the epochCoordinator. Then lets 
just remove the `currentMsg`


---

-

[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21116#discussion_r183489611
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing 
[[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: 
SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer. " +
+  s"The input RDD has ${messages.length} partitions.")
+// Let the epoch coordinator know how many partitions the write RDD 
has.
+EpochCoordinatorRef.get(
+
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+sparkContext.env)
+  .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+try {
+  // Force the RDD to run so continuous processing starts; no data is 
actually being collected
+  // to the driver, as ContinuousWriteRDD outputs nothing.
+  sparkContext.runJob(
+rdd,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
+rdd.partitions.indices)
+} catch {
+  case _: InterruptedException =>
+// Interruption is how continuous queries are ended, so accept and 
ignore the exception.
+  case cause: Throwable =>
+cause match {
+  // Do not wrap interruption exceptions that will be handled by 
streaming specially.
+  case _ if StreamExecution.isInterruptionException(cause) => 
throw cause
+  // Only wrap non fatal exceptions.
+  case NonFatal(e) => throw new SparkException("Writing job 
aborted.", e)
+  case _ => throw cause
+}
+}
+
+sparkContext.emptyRDD
+  }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+  def run(
+  writeTask: DataWriterFactory[InternalRow],
+  context: TaskContext,
+  iter: Iterator[InternalRow]): Unit = {
+val epochCoordinator = EpochCoordinatorRef.get(
+  
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  SparkEnv.get)
+val currentMsg: WriterCommitMessage = null
--- End diff --

Good point. I see its no long used anywhere. That raises two questions. 
- This should be removed. 
- Does this continuous code pat

[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21116#discussion_r183488923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing 
[[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: 
SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
--- End diff --

Is this really needed. The only use of it is in the logInfo before, that 
too, only in the length, which is effectively `rdd.partitions.length`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-21 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21116#discussion_r183224838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, 
InternalRowDataWriterFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, 
logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing 
[[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: 
SparkPlan)
+extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val writerFactory = writer match {
+  case w: SupportsWriteInternalRow => 
w.createInternalRowWriterFactory()
+  case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+}
+
+val rdd = query.execute()
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+logInfo(s"Start processing data source writer: $writer. " +
+  s"The input RDD has ${messages.length} partitions.")
+// Let the epoch coordinator know how many partitions the write RDD 
has.
+EpochCoordinatorRef.get(
+
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+sparkContext.env)
+  .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+try {
+  // Force the RDD to run so continuous processing starts; no data is 
actually being collected
+  // to the driver, as ContinuousWriteRDD outputs nothing.
+  sparkContext.runJob(
+rdd,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
+rdd.partitions.indices)
+} catch {
+  case _: InterruptedException =>
+// Interruption is how continuous queries are ended, so accept and 
ignore the exception.
+  case cause: Throwable =>
+cause match {
+  // Do not wrap interruption exceptions that will be handled by 
streaming specially.
+  case _ if StreamExecution.isInterruptionException(cause) => 
throw cause
+  // Only wrap non fatal exceptions.
+  case NonFatal(e) => throw new SparkException("Writing job 
aborted.", e)
+  case _ => throw cause
+}
+}
+
+sparkContext.emptyRDD
+  }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+  def run(
+  writeTask: DataWriterFactory[InternalRow],
+  context: TaskContext,
+  iter: Iterator[InternalRow]): Unit = {
+val epochCoordinator = EpochCoordinatorRef.get(
+  
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  SparkEnv.get)
+val currentMsg: WriterCommitMessage = null
--- End diff --

currentMsg is no longer needed?


---

-
To unsubscribe, e-mail

[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

2018-04-20 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/21116

[SPARK-24038][SS] Refactor continuous writing to its own class

## What changes were proposed in this pull request?

Refactor continuous writing to its own class.

See WIP https://github.com/jose-torres/spark/pull/13 for the overall 
direction this is going, but I think this PR is very isolated and necessary 
anyway.

## How was this patch tested?

existing unit tests - refactoring only

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark SPARK-24038

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21116


commit 0e8d80da1350b950fd690ff7c762d07d0767eafd
Author: Jose Torres 
Date:   2018-03-21T22:05:31Z

partial

commit 270f8ffe062d76e44a726753afc07c78348c3cc6
Author: Jose Torres 
Date:   2018-03-22T06:08:52Z

make ContinuousWriteExec work

commit 0cfeaeb6c0e6a3b500341852db8c53359120a753
Author: Jose Torres 
Date:   2018-03-23T02:44:10Z

fix docs

commit 7c375339bac0704c99ba6d87ee671dc3b7c0f531
Author: Jose Torres 
Date:   2018-03-26T16:47:16Z

remove old path

commit 26c1eadc67ffc48bcaf877154660982455892389
Author: Jose Torres 
Date:   2018-03-26T17:44:19Z

rm old path

commit e53707f1c2b836c38d00ad9527f1cf7b498051b7
Author: Jose Torres 
Date:   2018-03-27T02:15:00Z

format + docs

commit 4a7a4cc4aef2e27025e019d8dde29c30026cb330
Author: Jose Torres 
Date:   2018-03-27T02:16:28Z

rename node

commit bf1bef4a40d18fbc55f2a905dee7c01649af47ca
Author: Jose Torres 
Date:   2018-03-30T00:59:36Z

remove inheritance altogether

commit 9303e4e5a233109f00b25e68e2dfbec6a9aa218d
Author: Jose Torres 
Date:   2018-04-20T18:18:57Z

don't do StreamWriter changes

commit 3d8dfa415902d1d7be45a36923d2d355936eefbe
Author: Jose Torres 
Date:   2018-04-20T18:19:25Z

Merge remote-tracking branch 'apache/master' into SPARK-24038




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org