[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21116 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21116#discussion_r183489854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +/** + * The physical plan for writing data into a continuous processing [[StreamWriter]]. + */ +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +// Let the epoch coordinator know how many partitions the write RDD has. +EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), +sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) + +try { + // Force the RDD to run so continuous processing starts; no data is actually being collected + // to the driver, as ContinuousWriteRDD outputs nothing. + sparkContext.runJob( +rdd, +(context: TaskContext, iter: Iterator[InternalRow]) => + WriteToContinuousDataSourceExec.run(writerFactory, context, iter), +rdd.partitions.indices) +} catch { + case _: InterruptedException => +// Interruption is how continuous queries are ended, so accept and ignore the exception. + case cause: Throwable => +cause match { + // Do not wrap interruption exceptions that will be handled by streaming specially. + case _ if StreamExecution.isInterruptionException(cause) => throw cause + // Only wrap non fatal exceptions. + case NonFatal(e) => throw new SparkException("Writing job aborted.", e) + case _ => throw cause +} +} + +sparkContext.emptyRDD + } +} + +object WriteToContinuousDataSourceExec extends Logging { + def run( + writeTask: DataWriterFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): Unit = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +val currentMsg: WriterCommitMessage = null --- End diff -- Nvm. I see the msg being sent back using the epochCoordinator. Then lets just remove the `currentMsg` --- -
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21116#discussion_r183489611 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +/** + * The physical plan for writing data into a continuous processing [[StreamWriter]]. + */ +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +// Let the epoch coordinator know how many partitions the write RDD has. +EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), +sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) + +try { + // Force the RDD to run so continuous processing starts; no data is actually being collected + // to the driver, as ContinuousWriteRDD outputs nothing. + sparkContext.runJob( +rdd, +(context: TaskContext, iter: Iterator[InternalRow]) => + WriteToContinuousDataSourceExec.run(writerFactory, context, iter), +rdd.partitions.indices) +} catch { + case _: InterruptedException => +// Interruption is how continuous queries are ended, so accept and ignore the exception. + case cause: Throwable => +cause match { + // Do not wrap interruption exceptions that will be handled by streaming specially. + case _ if StreamExecution.isInterruptionException(cause) => throw cause + // Only wrap non fatal exceptions. + case NonFatal(e) => throw new SparkException("Writing job aborted.", e) + case _ => throw cause +} +} + +sparkContext.emptyRDD + } +} + +object WriteToContinuousDataSourceExec extends Logging { + def run( + writeTask: DataWriterFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): Unit = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +val currentMsg: WriterCommitMessage = null --- End diff -- Good point. I see its no long used anywhere. That raises two questions. - This should be removed. - Does this continuous code pat
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21116#discussion_r183488923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +/** + * The physical plan for writing data into a continuous processing [[StreamWriter]]. + */ +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) --- End diff -- Is this really needed. The only use of it is in the logInfo before, that too, only in the length, which is effectively `rdd.partitions.length`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21116#discussion_r183224838 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +/** + * The physical plan for writing data into a continuous processing [[StreamWriter]]. + */ +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +// Let the epoch coordinator know how many partitions the write RDD has. +EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), +sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) + +try { + // Force the RDD to run so continuous processing starts; no data is actually being collected + // to the driver, as ContinuousWriteRDD outputs nothing. + sparkContext.runJob( +rdd, +(context: TaskContext, iter: Iterator[InternalRow]) => + WriteToContinuousDataSourceExec.run(writerFactory, context, iter), +rdd.partitions.indices) +} catch { + case _: InterruptedException => +// Interruption is how continuous queries are ended, so accept and ignore the exception. + case cause: Throwable => +cause match { + // Do not wrap interruption exceptions that will be handled by streaming specially. + case _ if StreamExecution.isInterruptionException(cause) => throw cause + // Only wrap non fatal exceptions. + case NonFatal(e) => throw new SparkException("Writing job aborted.", e) + case _ => throw cause +} +} + +sparkContext.emptyRDD + } +} + +object WriteToContinuousDataSourceExec extends Logging { + def run( + writeTask: DataWriterFactory[InternalRow], + context: TaskContext, + iter: Iterator[InternalRow]): Unit = { +val epochCoordinator = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + SparkEnv.get) +val currentMsg: WriterCommitMessage = null --- End diff -- currentMsg is no longer needed? --- - To unsubscribe, e-mail
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/21116 [SPARK-24038][SS] Refactor continuous writing to its own class ## What changes were proposed in this pull request? Refactor continuous writing to its own class. See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway. ## How was this patch tested? existing unit tests - refactoring only You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark SPARK-24038 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21116 commit 0e8d80da1350b950fd690ff7c762d07d0767eafd Author: Jose Torres Date: 2018-03-21T22:05:31Z partial commit 270f8ffe062d76e44a726753afc07c78348c3cc6 Author: Jose Torres Date: 2018-03-22T06:08:52Z make ContinuousWriteExec work commit 0cfeaeb6c0e6a3b500341852db8c53359120a753 Author: Jose Torres Date: 2018-03-23T02:44:10Z fix docs commit 7c375339bac0704c99ba6d87ee671dc3b7c0f531 Author: Jose Torres Date: 2018-03-26T16:47:16Z remove old path commit 26c1eadc67ffc48bcaf877154660982455892389 Author: Jose Torres Date: 2018-03-26T17:44:19Z rm old path commit e53707f1c2b836c38d00ad9527f1cf7b498051b7 Author: Jose Torres Date: 2018-03-27T02:15:00Z format + docs commit 4a7a4cc4aef2e27025e019d8dde29c30026cb330 Author: Jose Torres Date: 2018-03-27T02:16:28Z rename node commit bf1bef4a40d18fbc55f2a905dee7c01649af47ca Author: Jose Torres Date: 2018-03-30T00:59:36Z remove inheritance altogether commit 9303e4e5a233109f00b25e68e2dfbec6a9aa218d Author: Jose Torres Date: 2018-04-20T18:18:57Z don't do StreamWriter changes commit 3d8dfa415902d1d7be45a36923d2d355936eefbe Author: Jose Torres Date: 2018-04-20T18:19:25Z Merge remote-tracking branch 'apache/master' into SPARK-24038 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org