Hi,

I'm getting the following exception with Spark 1.5.2-rc2 (haven't tried
1.6.0 yet though) when using window function lag:

[2015-11-05 10:58:50,806] ERROR xo.builder.jobs.CompareJob []
[akka://JobServer/user/context-supervisor/MYCONTEXT] - Comparison has failed
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
dataType on unresolved object, tree: 'lag(RANDOM_FIELD7307,0,null)
        at
org.apache.spark.sql.catalyst.expressions.UnresolvedWindowFunction.dataType(windowExpressions.scala:277)
        at
org.apache.spark.sql.catalyst.expressions.BinaryOperator.checkInputDataTypes(Expression.scala:419)
        at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:58)
        at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293)
        at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:292)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328

I've tried to search for similar bugs in JIRA, but have found only
something slightly related to it under totally different conditions with an
idea that it might be influenced by speculation flag turned on.

This error happens in the following piece of code:











*        LOG.info("Calculating diff for "+
Joiner.on(',').join(cs.getKeyColumns()) + " and values "
+Joiner.on(',').join(cs.getValueColumns()));        // get combined grouped
data        leftDF = leftDF.withColumn("level", functions.lit(1)); //
first        rightDF = rightDF.withColumn("level", functions.lit(2)); //
second        DataFrame both = leftDF.unionAll(rightDF);        // get
status        both.withColumn(DIFF_COLUMN_NAME,
functions.first(status(valueColDiff(both, cs.getValueColumns()),
both.col("level")))
.over(Window.partitionBy(DFUtils.toColumns(both,
cs.getKeyColumns()))                        .orderBy("level")));*
with following methods used:
















*    private static Column valueColDiff(DataFrame df, Set<String>
valueCols) {        return
valueCols.stream().map(df::col).map(CompareJob::colDiff).reduce((a, b) ->
a.and(b)).get();    }    private static Column colDiff(Column col) {
return functions.lag(col, 0).equalTo(functions.lag(col, 1));    }
private static Column status(Column diff, Column level) {        Column
leftLevel = functions.lag(level, 0);        Column rightLevel =
functions.lag(level, 1);        return functions.when(leftLevel.isNull(),
EntityChangeStatus.NEW.toString())
.when(rightLevel.isNull(),
EntityChangeStatus.REMOVED.toString())                        .when(diff,
EntityChangeStatus.CHANGED.toString())
.otherwise(EntityChangeStatus.UNCHANGED.toString());    }*

Any hints?

Thanks,
Jiri

Reply via email to