mrbrahman opened a new issue, #10450:
URL: https://github.com/apache/iceberg/issues/10450
### Apache Iceberg version
None
### Query engine
Spark
### Please describe the bug 🐞
SPJ works great when joining 2 tables. For e.g.
~~~scala
// SPJ setup
import org.apache.spark.sql.functions._
val df = spark.range(0,1000000)
val a = df.repartition(10)
.withColumn("part_1", spark_partition_id)
// create tables with different datasets, but partitioned the same way
a.withColumn("c1", rand()).withColumn("c2", expr("uuid()"))
.write.partitionBy("part_1").format("iceberg").saveAsTable("ice1")
a.withColumn("c3", rand()).withColumn("c4", expr("uuid()"))
.write.partitionBy("part_1").format("iceberg").saveAsTable("ice2")
a.withColumn("c5", rand()).withColumn("c6", expr("uuid()"))
.write.partitionBy("part_1").format("iceberg").saveAsTable("ice3")
~~~
~~~sql
-- SPJ in action
-- disable broadcast for testing
set spark.sql.autoBroadcastJoinThreshold = -1;
set spark.sql.sources.v2.bucketing.enabled=true;
set `spark.sql.iceberg.planning.preserve-data-grouping`=true;
-- for our case, need this too
set spark.sql.requireAllClusterKeysForCoPartition=false;
create table ice_joined_1 using iceberg
partitioned by (part_1)
select a.id, c1, c2, c3, c4, a.part_1
from ice1 a
inner join ice2 b
on a.id=b.id -- main join key
and a.part_1=b.part_1 -- join on all sub-partition fields to enable SPJ
~~~

No shuffle! **SPJ works!**
But as soon as I add a third table to the join:
~~~sql
create table ice_joined_2 using iceberg
partitioned by (part_1)
select a.id, c1, c2, c3, c4, c5, c6, a.part_1
from ice1 a
inner join ice2 b
on a.id=b.id -- main join key
and a.part_1=b.part_1 -- join on all sub-partition fields to enable SPJ
inner join ice3 c
on a.id=c.id -- main join key
and a.part_1=c.part_1 -- join on all sub-partition fields to enable SPJ
;
~~~
... and BAM
~~~
Error happens in sql:
create table ice_joined_2 using iceberg
partitioned by (part_1)
select a.id, c1, c2, c3, c4, c5, c6, a.part_1
from ice1 a
inner join ice2 b
on a.id=b.id
and a.part_1=b.part_1
inner join ice3 c
on a.id=c.id
and a.part_1=c.part_1
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase
planning failed with an internal error. You hit a bug in Spark or the Spark
plugins you use. Please, report this bug to the corresponding communities or
vendors, and provide the full stack trace.
at
org.apache.spark.SparkException$.internalError(SparkException.scala:88)
at
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:560)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:572)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.zeppelin.spark.SparkSqlInterpreter.internalInterpret(SparkSqlInterpreter.java:106)
at
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:647)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:447)
at scala.collection.immutable.List.map(List.scala:293)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:444)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:426)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.org$apache$spark$sql$execution$exchange$BaseEnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:205)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:679)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:654)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:612)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:612)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:605)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1322)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1321)
at
org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:43)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:605)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:581)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:654)
at
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:69)
at
org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.$anonfun$applyAll$1(RuleHelper.scala:28)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.applyAll(RuleHelper.scala:27)
at
org.apache.spark.sql.execution.adaptive.PreprocessingRule$.$anonfun$apply$2(PreprocessingRule.scala:60)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.$anonfun$doExecute$1(InsertAdaptiveSparkPlan.scala:259)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.doExecute(InsertAdaptiveSparkPlan.scala:258)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.execute(InsertAdaptiveSparkPlan.scala:229)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.preprocessAndCreateAdaptivePlan(InsertAdaptiveSparkPlan.scala:175)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:132)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.$anonfun$applyInternal$1(InsertAdaptiveSparkPlan.scala:125)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:125)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
at
org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:495)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at
org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:494)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
... 58 more
~~~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]