hemanthboyina commented on issue #15610:
URL: https://github.com/apache/iceberg/issues/15610#issuecomment-4055467002

   tried reproducing the issue and as @moomindani  I am unable to reproduce. Is 
anything I am missing here @andyguwc  ?
   
   `@TestTemplate
     public void testMergeAfterBucketEvolutionAndRewrite() {
       sql(
           "CREATE TABLE %s (id INT, salary INT, dep STRING)"
               + "USING iceberg "
               + "PARTITIONED BY (bucket(4, id))"
               + "TBLPROPERTIES (%s)",
           tableName, tablePropsAsString(COMMON_TABLE_PROPERTIES));
   
       append(tableName, "{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }");
       append(tableName, "{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }");
       append(tableName, "{ \"id\": 3, \"salary\": 300, \"dep\": \"hardware\" 
}");
   
       Table table = validationCatalog.loadTable(tableIdent);
   
       // Evolve: drop bucket(4, id), add bucket(64, id)
       table.updateSpec().removeField(Expressions.bucket("id", 4)).commit();
       table.updateSpec().addField(Expressions.bucket("id", 64)).commit();
   
       // Rewrite all data to new spec and expire old snapshots
       SparkActions.get(spark).rewriteDataFiles(table).option("rewrite-all", 
"true").execute();
       SparkActions.get(spark).expireSnapshots(table).retainLast(1).execute();
   
       // Create staging table with matching bucket(64, id)
       sql(
           "CREATE TABLE %s (id INT, salary INT, dep STRING)"
               + "USING iceberg "
               + "PARTITIONED BY (bucket(64, id))"
               + "TBLPROPERTIES (%s)",
           tableName(OTHER_TABLE_NAME), 
tablePropsAsString(COMMON_TABLE_PROPERTIES));
   
       append(tableName(OTHER_TABLE_NAME), "{ \"id\": 1, \"salary\": 150, 
\"dep\": \"hr\" }");
       append(tableName(OTHER_TABLE_NAME), "{ \"id\": 4, \"salary\": 400, 
\"dep\": \"software\" }");
   
       sql("REFRESH TABLE %s", tableName);
   
       Map<String, String> mergeTableProps =
           ImmutableMap.of(
               TableProperties.MERGE_MODE,
               MERGE_ON_READ.modeName(),
               TableProperties.MERGE_DISTRIBUTION_MODE,
               "none");
   
       sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", tableName, 
tablePropsAsString(mergeTableProps));
   
       withSQLConf(
           ENABLED_SPJ_SQL_CONF,
           () -> {
             SparkPlan plan =
                 executeAndKeepPlan(
                     "MERGE INTO %s AS t USING %s AS s "
                         + "ON t.id = s.id "
                         + "WHEN MATCHED THEN "
                         + "  UPDATE SET t.salary = s.salary "
                         + "WHEN NOT MATCHED THEN "
                         + "  INSERT *",
                     tableName, tableName(OTHER_TABLE_NAME));
             String planAsString = plan.toString();
             // SPJ should work — no Exchange shuffles for MoR merge
             assertThat(planAsString)
                 .as("MERGE INTO after bucket evolution + rewrite should use 
SPJ (no Exchange)")
                 .doesNotContain("Exchange");
           });
     }`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to