Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159981422
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
    @@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] 
(sparkSession: SparkSession) extends Lo
               "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
         }
     
    -    sink match {
    -      case v1Sink: Sink =>
    -        new StreamingQueryWrapper(new MicroBatchExecution(
    +    (sink, trigger) match {
    +      case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) =>
    +        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    +        new StreamingQueryWrapper(new ContinuousExecution(
               sparkSession,
               userSpecifiedName.orNull,
               checkpointLocation,
               analyzedPlan,
    -          v1Sink,
    +          v2Sink,
               trigger,
               triggerClock,
               outputMode,
    +          extraOptions,
               deleteCheckpointOnStop))
    -      case v2Sink: ContinuousWriteSupport =>
    -        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, 
outputMode)
    -        new StreamingQueryWrapper(new ContinuousExecution(
    +      case (_: MicroBatchWriteSupport, _) | (_: Sink, _) =>
    --- End diff --
    
    Shouldnt we throw error for the case `MicroBatchWriteSupport` (sink does 
not have ContinuousWriteSupport ) and `ContinuousTrigger`???
      


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to