mrbrahman opened a new issue, #10450:
URL: https://github.com/apache/iceberg/issues/10450

   ### Apache Iceberg version
   
   None
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   SPJ works great when joining 2 tables. For e.g.
   
   ~~~scala
   // SPJ setup
   import org.apache.spark.sql.functions._
    
   val df = spark.range(0,1000000)
    
   val a = df.repartition(10)
       .withColumn("part_1", spark_partition_id)
   
   // create tables with different datasets, but partitioned the same way
   a.withColumn("c1", rand()).withColumn("c2", expr("uuid()"))
     .write.partitionBy("part_1").format("iceberg").saveAsTable("ice1")
   
   a.withColumn("c3", rand()).withColumn("c4", expr("uuid()"))
     .write.partitionBy("part_1").format("iceberg").saveAsTable("ice2")
   
   a.withColumn("c5", rand()).withColumn("c6", expr("uuid()"))
     .write.partitionBy("part_1").format("iceberg").saveAsTable("ice3")
   
   ~~~
   
   ~~~sql
   -- SPJ in action
   -- disable broadcast for testing
   set spark.sql.autoBroadcastJoinThreshold = -1;
   
   set spark.sql.sources.v2.bucketing.enabled=true;
   set `spark.sql.iceberg.planning.preserve-data-grouping`=true;
    
   -- for our case, need this too
   set spark.sql.requireAllClusterKeysForCoPartition=false;
   
   create table ice_joined_1 using iceberg
   partitioned by (part_1)
   select a.id, c1, c2, c3, c4, a.part_1
   from ice1 a
     inner join ice2 b
       on a.id=b.id   -- main join key
       and a.part_1=b.part_1  -- join on all sub-partition fields to enable SPJ
   ~~~
   
![image1](https://github.com/apache/iceberg/assets/16898939/46799a13-5456-4bdd-85f9-61c93c20f290)
   
   No shuffle! **SPJ works!**
   
   But as soon as I add a third table to the join:
   
   ~~~sql
   create table ice_joined_2 using iceberg
   partitioned by (part_1)
   select a.id, c1, c2, c3, c4, c5, c6, a.part_1
   from ice1 a
     inner join ice2 b
       on a.id=b.id   -- main join key
       and a.part_1=b.part_1  -- join on all sub-partition fields to enable SPJ
     inner join ice3 c
       on a.id=c.id   -- main join key
       and a.part_1=c.part_1  -- join on all sub-partition fields to enable SPJ
   ;
   ~~~
   ... and BAM
   ~~~
   Error happens in sql: 
   create table ice_joined_2 using iceberg
   partitioned by (part_1)
   
   
   
   select a.id, c1, c2, c3, c4, c5, c6, a.part_1
   from ice1 a
     inner join ice2 b
       on a.id=b.id   
       and a.part_1=b.part_1  
     inner join ice3 c
       on a.id=c.id   
       and a.part_1=c.part_1  
   
   org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase 
planning failed with an internal error. You hit a bug in Spark or the Spark 
plugins you use. Please, report this bug to the corresponding communities or 
vendors, and provide the full stack trace.
        at 
org.apache.spark.SparkException$.internalError(SparkException.scala:88)
        at 
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:572)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
        at 
org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
        at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.zeppelin.spark.SparkSqlInterpreter.internalInterpret(SparkSqlInterpreter.java:106)
        at 
org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:55)
        at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:860)
        at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
        at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
        at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
        at 
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:42)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:208)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:647)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:447)
        at scala.collection.immutable.List.map(List.scala:293)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:444)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:426)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.org$apache$spark$sql$execution$exchange$BaseEnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:205)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:679)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:654)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:612)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:612)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:605)
        at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1322)
        at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1321)
        at 
org.apache.spark.sql.execution.ProjectExec.mapChildren(basicPhysicalOperators.scala:43)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:605)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:581)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:654)
        at 
org.apache.spark.sql.execution.exchange.BaseEnsureRequirements.apply(EnsureRequirements.scala:69)
        at 
org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.$anonfun$applyAll$1(RuleHelper.scala:28)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at 
org.apache.spark.sql.execution.adaptive.RuleHelper$RuleSeq.applyAll(RuleHelper.scala:27)
        at 
org.apache.spark.sql.execution.adaptive.PreprocessingRule$.$anonfun$apply$2(PreprocessingRule.scala:60)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.$anonfun$doExecute$1(InsertAdaptiveSparkPlan.scala:259)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.doExecute(InsertAdaptiveSparkPlan.scala:258)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan$PreprocessingRuleExecutor.execute(InsertAdaptiveSparkPlan.scala:229)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.preprocessAndCreateAdaptivePlan(InsertAdaptiveSparkPlan.scala:175)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:132)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.$anonfun$applyInternal$1(InsertAdaptiveSparkPlan.scala:125)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.applyInternal(InsertAdaptiveSparkPlan.scala:125)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:115)
        at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.apply(InsertAdaptiveSparkPlan.scala:48)
        at 
org.apache.spark.sql.execution.QueryExecution$.$anonfun$prepareForExecution$1(QueryExecution.scala:495)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at 
org.apache.spark.sql.execution.QueryExecution$.prepareForExecution(QueryExecution.scala:494)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
        ... 58 more
   ~~~
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to