[ 
https://issues.apache.org/jira/browse/SPARK-25424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613830#comment-16613830
 ] 

Raghav Kumar Gautam commented on SPARK-25424:
---------------------------------------------

I have a patch for this. Can someone assign this issue to me ?

> Window duration and slide duration with negative values should fail fast
> ------------------------------------------------------------------------
>
>                 Key: SPARK-25424
>                 URL: https://issues.apache.org/jira/browse/SPARK-25424
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Raghav Kumar Gautam
>            Priority: Major
>             Fix For: 2.3.2
>
>
> In TimeWindow class window duration and slide duration is not be allowed to 
> take negative values.
> Currently this is enforced by catalyst. It can be enforced by constructor of 
> TimeWindow allowing it to fail fast.
> For e.g. the code below throws following error. Note that the error is 
> produced at the time of count() call instead of window() call.
> {code}
> val df = spark.readStream
>   .format("rate")
>   .option("numPartitions", "2")
>   .option("rowsPerSecond", "10")
>   .load()
>   .filter("value % 20 == 0")
>   .withWatermark("timestamp", "10 seconds")
>   .groupBy(window($"timestamp", "-10 seconds", "5 seconds"))
>   .count()
> {code}
> Error:
> {code}
> cannot resolve 'timewindow(timestamp, -10000000, 5000000, 0)' due to data 
> type mismatch: The window duration (-10000000) must be greater than 0.;;
> 'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], 
> [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, 
> count(1) AS count#57L]
> +- AnalysisBarrier
>       +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
>          +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
>             +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, 
> Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], 
> StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond
>  -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
> org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 
> -10000000, 5000000, 0)' due to data type mismatch: The window duration 
> (-10000000) must be greater than 0.;;
> 'Aggregate [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0)], 
> [timewindow(timestamp#47-T10000ms, -10000000, 5000000, 0) AS window#53, 
> count(1) AS count#57L]
> +- AnalysisBarrier
>       +- EventTimeWatermark timestamp#47: timestamp, interval 10 seconds
>          +- Filter ((value#48L % cast(20 as bigint)) = cast(0 as bigint))
>             +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.RateSourceProvider@52e44f71, rate, 
> Map(rowsPerSecond -> 10, numPartitions -> 2), [timestamp#47, value#48L], 
> StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@221961f2,rate,List(),None,List(),None,Map(rowsPerSecond
>  -> 10, numPartitions -> 2),None), rate, [timestamp#45, value#46L]
>       at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>       at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93)
>       at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
>       at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
>       at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
>       at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>       at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
>       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
>       at 
> org.apache.spark.sql.RelationalGroupedDataset.toDF(RelationalGroupedDataset.scala:66)
>       at 
> org.apache.spark.sql.RelationalGroupedDataset.count(RelationalGroupedDataset.scala:239)
>       at 
> com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply$mcV$sp(HelloScalaTest.scala:39)
>       at 
> com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
>       at 
> com.hortonworks.qe.HelloScalaTest$$anonfun$2.apply(HelloScalaTest.scala:28)
>       at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>       at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>       at org.scalatest.Transformer.apply(Transformer.scala:22)
>       at org.scalatest.Transformer.apply(Transformer.scala:20)
>       at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>       at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
>       at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
>       at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>       at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>       at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
>       at 
> com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$runTest(AbstractTest.scala:11)
>       at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
>       at com.hortonworks.qe.AbstractTest.runTest(AbstractTest.scala:11)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>       at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
>       at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
>       at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
>       at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
>       at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
>       at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
>       at org.scalatest.Suite$class.run(Suite.scala:1147)
>       at 
> org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
>       at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
>       at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
>       at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
>       at 
> com.hortonworks.qe.AbstractTest.org$scalatest$BeforeAndAfter$$super$run(AbstractTest.scala:11)
>       at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
>       at com.hortonworks.qe.AbstractTest.run(AbstractTest.scala:11)
>       at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
>       at 
> org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
>       at 
> org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
>       at 
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
>       at 
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
>       at org.scalatest.tools.Runner$.run(Runner.scala:850)
>       at org.scalatest.tools.Runner.run(Runner.scala)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to