Hi, I'm getting the following exception with Spark 1.5.2-rc2 (haven't tried 1.6.0 yet though) when using window function lag:
[2015-11-05 10:58:50,806] ERROR xo.builder.jobs.CompareJob [] [akka://JobServer/user/context-supervisor/MYCONTEXT] - Comparison has failed org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'lag(RANDOM_FIELD7307,0,null) at org.apache.spark.sql.catalyst.expressions.UnresolvedWindowFunction.dataType(windowExpressions.scala:277) at org.apache.spark.sql.catalyst.expressions.BinaryOperator.checkInputDataTypes(Expression.scala:419) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:58) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:292) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328 I've tried to search for similar bugs in JIRA, but have found only something slightly related to it under totally different conditions with an idea that it might be influenced by speculation flag turned on. This error happens in the following piece of code: * LOG.info("Calculating diff for "+ Joiner.on(',').join(cs.getKeyColumns()) + " and values " +Joiner.on(',').join(cs.getValueColumns())); // get combined grouped data leftDF = leftDF.withColumn("level", functions.lit(1)); // first rightDF = rightDF.withColumn("level", functions.lit(2)); // second DataFrame both = leftDF.unionAll(rightDF); // get status both.withColumn(DIFF_COLUMN_NAME, functions.first(status(valueColDiff(both, cs.getValueColumns()), both.col("level"))) .over(Window.partitionBy(DFUtils.toColumns(both, cs.getKeyColumns())) .orderBy("level")));* with following methods used: * private static Column valueColDiff(DataFrame df, Set<String> valueCols) { return valueCols.stream().map(df::col).map(CompareJob::colDiff).reduce((a, b) -> a.and(b)).get(); } private static Column colDiff(Column col) { return functions.lag(col, 0).equalTo(functions.lag(col, 1)); } private static Column status(Column diff, Column level) { Column leftLevel = functions.lag(level, 0); Column rightLevel = functions.lag(level, 1); return functions.when(leftLevel.isNull(), EntityChangeStatus.NEW.toString()) .when(rightLevel.isNull(), EntityChangeStatus.REMOVED.toString()) .when(diff, EntityChangeStatus.CHANGED.toString()) .otherwise(EntityChangeStatus.UNCHANGED.toString()); }* Any hints? Thanks, Jiri