Github user xuanyuanking commented on a diff in the pull request:
    --- Diff: 
    @@ -0,0 +1,126 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming.continuous
    +import scala.util.control.NonFatal
    +import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.util.Utils
    + * The physical plan for writing data into a continuous processing 
    + */
    +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: 
    +    extends SparkPlan with Logging {
    +  override def children: Seq[SparkPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Nil
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    val writerFactory = writer match {
    +      case w: SupportsWriteInternalRow => 
    +      case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
    +    }
    +    val rdd = query.execute()
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    +    logInfo(s"Start processing data source writer: $writer. " +
    +      s"The input RDD has ${messages.length} partitions.")
    +    // Let the epoch coordinator know how many partitions the write RDD 
    +    EpochCoordinatorRef.get(
    +        sparkContext.env)
    +      .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
    +    try {
    +      // Force the RDD to run so continuous processing starts; no data is 
actually being collected
    +      // to the driver, as ContinuousWriteRDD outputs nothing.
    +      sparkContext.runJob(
    +        rdd,
    +        (context: TaskContext, iter: Iterator[InternalRow]) =>
    +, context, 
    +        rdd.partitions.indices)
    +    } catch {
    +      case _: InterruptedException =>
    +        // Interruption is how continuous queries are ended, so accept and 
ignore the exception.
    +      case cause: Throwable =>
    +        cause match {
    +          // Do not wrap interruption exceptions that will be handled by 
streaming specially.
    +          case _ if StreamExecution.isInterruptionException(cause) => 
throw cause
    +          // Only wrap non fatal exceptions.
    +          case NonFatal(e) => throw new SparkException("Writing job 
aborted.", e)
    +          case _ => throw cause
    +        }
    +    }
    +    sparkContext.emptyRDD
    +  }
    +object WriteToContinuousDataSourceExec extends Logging {
    +  def run(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): Unit = {
    +    val epochCoordinator = EpochCoordinatorRef.get(
    +      SparkEnv.get)
    +    val currentMsg: WriterCommitMessage = null
    --- End diff --
    currentMsg is no longer needed?


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to