Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159985442
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
    @@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] 
(sparkSession: SparkSession) extends Lo
               "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
         }
     
    -    sink match {
    -      case v1Sink: Sink =>
    -        new StreamingQueryWrapper(new MicroBatchExecution(
    +    (sink, trigger) match {
    +      case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
    +        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    +        new StreamingQueryWrapper(new ContinuousExecution(
               sparkSession,
               userSpecifiedName.orNull,
               checkpointLocation,
               analyzedPlan,
    -          v1Sink,
    +          v2Sink,
               trigger,
               triggerClock,
               outputMode,
    +          extraOptions,
               deleteCheckpointOnStop))
    -      case v2Sink: ContinuousWriteSupport =>
    -        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    -        new StreamingQueryWrapper(new ContinuousExecution(
    +      case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
    --- End diff --
    
    As discussed offline, we do throw that error in the MicroBatchExecution 
constructor. Once all the pieces are in we'll need to refactor this a bit to 
get all the checking in the same place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to