Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20097#discussion_r159555174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -240,31 +240,35 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } - sink match { - case v1Sink: Sink => - new StreamingQueryWrapper(new MicroBatchExecution( + (sink, trigger) match { + case (v2Sink: ContinuousWriteSupport, trigger: ContinuousTrigger) => + UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) + new StreamingQueryWrapper(new ContinuousExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, - v1Sink, + v2Sink, trigger, triggerClock, outputMode, + extraOptions, deleteCheckpointOnStop)) - case v2Sink: ContinuousWriteSupport => - UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) - new StreamingQueryWrapper(new ContinuousExecution( + case (_: MicroBatchWriteSupport, _) | (_: Sink, _) => + new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, - v2Sink, + sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) + case _ => + throw new AnalysisException( --- End diff -- I think is the only other option. MicroBatchWriteSupport and Sink will have already matched with any trigger, ContinuousWriteSupport will have already matched with a continuous trigger, and there aren't any other implementations of BaseStreamingSink. I agree it's cleaner to check explicitly.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org