Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159555174
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
    @@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] 
(sparkSession: SparkSession) extends Lo
               "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
         }
     
    -    sink match {
    -      case v1Sink: Sink =>
    -        new StreamingQueryWrapper(new MicroBatchExecution(
    +    (sink, trigger) match {
    +      case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
    +        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    +        new StreamingQueryWrapper(new ContinuousExecution(
               sparkSession,
               userSpecifiedName.orNull,
               checkpointLocation,
               analyzedPlan,
    -          v1Sink,
    +          v2Sink,
               trigger,
               triggerClock,
               outputMode,
    +          extraOptions,
               deleteCheckpointOnStop))
    -      case v2Sink: ContinuousWriteSupport =>
    -        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    -        new StreamingQueryWrapper(new ContinuousExecution(
    +      case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
    +        new StreamingQueryWrapper(new MicroBatchExecution(
               sparkSession,
               userSpecifiedName.orNull,
               checkpointLocation,
               analyzedPlan,
    -          v2Sink,
    +          sink,
               trigger,
               triggerClock,
               outputMode,
               extraOptions,
               deleteCheckpointOnStop))
    +      case _ =>
    +        throw new AnalysisException(
    --- End diff --
    
    I think is the only other option. MicroBatchWriteSupport and Sink will have 
already matched with any trigger, ContinuousWriteSupport will have already 
matched with a continuous trigger, and there aren't any other implementations 
of BaseStreamingSink.
    
    I agree it's cleaner to check explicitly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to