Adding DEV.

Or is there any other way to do subtractByKey using Dataset APIs?

Thanks
Ankur

On Wed, Mar 1, 2017 at 1:28 PM, Ankur Srivastava <ankur.srivast...@gmail.com
> wrote:

> Hi Users,
>
> We are facing an issue with left_outer join using Spark Dataset api in 2.0
> Java API. Below is the code we have
>
> Dataset<Row> badIds = filteredDS.groupBy(col("id").alias("bid")).count()
>         .filter((FilterFunction<Row>) row -> (Long) row.getAs("count") > 
> 75000);
> _logger.info("Id count with over 75K records that will be filtered: " + 
> badIds.count());
>
> Dataset<SomeData> fiteredRows = filteredDS.join(broadcast(badIds), 
> filteredDS.col("id").equalTo(badDevices.col("bid")), "left_outer")
>         .filter((FilterFunction<Row>) row ->  row.getAs("bid") == null)
>         .map((MapFunction<Row, SomeData>) row -> 
> SomeDataFactory.createObjectFromDDRow(row), Encoders.bean(DeviceData.class));
>
>
> We get the counts in the log file and then the application fils with below
> exception
> Exception in thread "main" java.lang.UnsupportedOperationException: Only
> code-generated evaluation is supported.
>         at org.apache.spark.sql.catalyst.expressions.objects.Invoke.
> eval(objects.scala:118)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> canFilterOutNull(joins.scala:109)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$7.apply(joins.scala:118)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$7.apply(joins.scala:118)
>         at scala.collection.LinearSeqOptimized$class.
> exists(LinearSeqOptimized.scala:93)
>         at scala.collection.immutable.List.exists(List.scala:84)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> buildNewJoinType(joins.scala:118)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$2.applyOrElse(joins.scala:133)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$2.applyOrElse(joins.scala:131)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
>         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
> withOrigin(TreeNode.scala:69)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:278)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
>         at org.apache.spark.sql.catalyst.trees.TreeNode.transform(
> TreeNode.scala:268)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> apply(joins.scala:131)
>         at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> apply(joins.scala:98)
>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>         at scala.collection.IndexedSeqOptimized$class.
> foldl(IndexedSeqOptimized.scala:57)
>         at scala.collection.IndexedSeqOptimized$class.
> foldLeft(IndexedSeqOptimized.scala:66)
>         at scala.collection.mutable.WrappedArray.foldLeft(
> WrappedArray.scala:35)
>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1.apply(RuleExecutor.scala:82)
>         at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1.apply(RuleExecutor.scala:74)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(
> RuleExecutor.scala:74)
>         at org.apache.spark.sql.execution.QueryExecution.
> optimizedPlan$lzycompute(QueryExecution.scala:74)
>         at org.apache.spark.sql.execution.QueryExecution.
> optimizedPlan(QueryExecution.scala:74)
>         at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan$lzycompute(QueryExecution.scala:78)
>         at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan(QueryExecution.scala:76)
>         at org.apache.spark.sql.execution.QueryExecution.
> executedPlan$lzycompute(QueryExecution.scala:83)
>         at org.apache.spark.sql.execution.QueryExecution.
> executedPlan(QueryExecution.scala:83)
>         at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
>         at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>         at test.Driver.main(Driver.java:106)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:185)
>         at org.apache.spark.deploy.SparkSubmit$.submit(
> SparkSubmit.scala:210)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:124)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks
> Ankur
>

Reply via email to