Adding DEV. Or is there any other way to do subtractByKey using Dataset APIs?
Thanks Ankur On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <ankur.srivast...@gmail.com > wrote: > Hi Users, > > We are facing an issue with left_outer join using Spark Dataset api in 2.0 > Java API. Below is the code we have > > Dataset<Row> badIds = filteredDS.groupBy(col("id").alias("bid")).count() > .filter((FilterFunction<Row>) row -> (Long) row.getAs("count") > > 75000); > _logger.info("Id count with over 75K records that will be filtered: " + > badIds.count()); > > Dataset<SomeData> fiteredRows = filteredDS.join(broadcast(badIds), > filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer") > .filter((FilterFunction<Row>) row -> row.getAs("bid") == null) > .map((MapFunction<Row, SomeData>) row -> > SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class)); > > > We get the counts in the log file and then the application fils with below > exception > Exception in thread "main" java.lang.UnsupportedOperationException: Only > code-generated evaluation is supported. > at org.apache.spark.sql.catalyst.expressions.objects.Invoke. > eval(objects.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ > canFilterOutNull(joins.scala:109) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$7.apply(joins.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$7.apply(joins.scala:118) > at scala.collection.LinearSeqOptimized$class. > exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$ > buildNewJoinType(joins.scala:118) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$apply$2.applyOrElse(joins.scala:133) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$ > anonfun$apply$2.applyOrElse(joins.scala:131) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. > apply(TreeNode.scala:279) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3. > apply(TreeNode.scala:279) > at org.apache.spark.sql.catalyst.trees.CurrentOrigin$. > withOrigin(TreeNode.scala:69) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:278) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformDown$1.apply(TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5. > apply(TreeNode.scala:321) > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:179) > at org.apache.spark.sql.catalyst.trees.TreeNode. > transformChildren(TreeNode.scala:319) > at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown( > TreeNode.scala:284) > at org.apache.spark.sql.catalyst.trees.TreeNode.transform( > TreeNode.scala:268) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > apply(joins.scala:131) > at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$. > apply(joins.scala:98) > at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$ > execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$ > execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at scala.collection.IndexedSeqOptimized$class. > foldl(IndexedSeqOptimized.scala:57) > at scala.collection.IndexedSeqOptimized$class. > foldLeft(IndexedSeqOptimized.scala:66) > at scala.collection.mutable.WrappedArray.foldLeft( > WrappedArray.scala:35) > at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$ > execute$1.apply(RuleExecutor.scala:82) > at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$ > execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute( > RuleExecutor.scala:74) > at org.apache.spark.sql.execution.QueryExecution. > optimizedPlan$lzycompute(QueryExecution.scala:74) > at org.apache.spark.sql.execution.QueryExecution. > optimizedPlan(QueryExecution.scala:74) > at org.apache.spark.sql.execution.QueryExecution. > sparkPlan$lzycompute(QueryExecution.scala:78) > at org.apache.spark.sql.execution.QueryExecution. > sparkPlan(QueryExecution.scala:76) > at org.apache.spark.sql.execution.QueryExecution. > executedPlan$lzycompute(QueryExecution.scala:83) > at org.apache.spark.sql.execution.QueryExecution. > executedPlan(QueryExecution.scala:83) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2226) > at test.Driver.main(Driver.java:106) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$ > deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1( > SparkSubmit.scala:185) > at org.apache.spark.deploy.SparkSubmit$.submit( > SparkSubmit.scala:210) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit. > scala:124) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > Thanks > Ankur >