Hi Users,

We are facing an issue with left_outer join using Spark Dataset api in 2.0
Java API. Below is the code we have

Dataset<Row> badIds = filteredDS.groupBy(col("id").alias("bid")).count()
        .filter((FilterFunction<Row>) row -> (Long) row.getAs("count") > 75000);
_logger.info("Id count with over 75K records that will be filtered: "
+ badIds.count());

Dataset<SomeData> fiteredRows = filteredDS.join(broadcast(badIds),
filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
        .filter((FilterFunction<Row>) row ->  row.getAs("bid") == null)
        .map((MapFunction<Row, SomeData>) row ->
SomeDataFactory.createObjectFromDDRow(row),
Encoders.bean(DeviceData.class));


We get the counts in the log file and then the application fils with below
exception
Exception in thread "main" java.lang.UnsupportedOperationException: Only
code-generated evaluation is supported.
        at
org.apache.spark.sql.catalyst.expressions.objects.Invoke.eval(objects.scala:118)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:109)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$7.apply(joins.scala:118)
        at
scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
        at scala.collection.immutable.List.exists(List.scala:84)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:118)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:133)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:131)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
        at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
        at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:131)
        at
org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.apply(joins.scala:98)
        at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
        at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at
scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
        at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
        at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
        at
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
        at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
        at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
        at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
        at test.Driver.main(Driver.java:106)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks
Ankur

Reply via email to