Copilot commented on code in PR #9588:
URL: https://github.com/apache/gravitino/pull/9588#discussion_r2660507166


##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);
+
+      // Execute the procedure
+      Row[] results = (Row[]) spark.sql(sql).collect();
+
+      // Print results
+      if (results.length > 0) {
+        Row result = results[0];
+        // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+        if (result.size() >= 5) {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n"
+                  + "  Removed delete files: %d%n",
+              result.getInt(0),
+              result.getInt(1),
+              result.getLong(2),
+              result.getInt(3),
+              result.getInt(4));
+        } else {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n",
+              result.getInt(0), result.getInt(1), result.getLong(2), 
result.getInt(3));
+        }
+      }
+
+      System.out.println("Rewrite data files job completed successfully");
+    } catch (Exception e) {
+      System.err.println("Error executing rewrite data files job: " + 
e.getMessage());

Review Comment:
   The exception handler catches generic Exception and prints the error message 
with e.getMessage(), which may return null for some exception types. This could 
result in printing "Error executing rewrite data files job: null" which is not 
helpful for debugging. Consider checking if the message is null and providing 
more context, or using a more specific exception type, or always including the 
exception class name in the error output.
   ```suggestion
         String errorMessage =
             e.getMessage() != null ? e.getMessage() : "no error message 
available";
         System.err.println(
             "Error executing rewrite data files job ("
                 + e.getClass().getName()
                 + "): "
                 + errorMessage);
   ```



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);
+
+      // Execute the procedure
+      Row[] results = (Row[]) spark.sql(sql).collect();
+
+      // Print results
+      if (results.length > 0) {
+        Row result = results[0];
+        // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+        if (result.size() >= 5) {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n"
+                  + "  Removed delete files: %d%n",
+              result.getInt(0),
+              result.getInt(1),
+              result.getLong(2),
+              result.getInt(3),
+              result.getInt(4));
+        } else {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n",
+              result.getInt(0), result.getInt(1), result.getLong(2), 
result.getInt(3));
+        }
+      }
+
+      System.out.println("Rewrite data files job completed successfully");
+    } catch (Exception e) {
+      System.err.println("Error executing rewrite data files job: " + 
e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  /**
+   * Build the SQL CALL statement for the rewrite_data_files procedure.
+   *
+   * @param catalogName Iceberg catalog name
+   * @param tableIdentifier Fully qualified table name
+   * @param strategy Rewrite strategy (binpack or sort)
+   * @param sortOrder Sort order specification
+   * @param whereClause Filter predicate
+   * @param optionsJson JSON map of options
+   * @return SQL CALL statement
+   */
+  static String buildProcedureCall(
+      String catalogName,
+      String tableIdentifier,
+      String strategy,
+      String sortOrder,
+      String whereClause,
+      String optionsJson) {
+
+    StringBuilder sql = new StringBuilder();
+    sql.append("CALL 
").append(catalogName).append(".system.rewrite_data_files(");
+    sql.append("table => '").append(tableIdentifier).append("'");
+
+    if (strategy != null && !strategy.isEmpty()) {
+      sql.append(", strategy => '").append(strategy).append("'");
+    }
+
+    if (sortOrder != null && !sortOrder.isEmpty()) {
+      sql.append(", sort_order => '").append(sortOrder).append("'");
+    }
+
+    if (whereClause != null && !whereClause.isEmpty()) {
+      sql.append(", where => '").append(whereClause).append("'");
+    }
+
+    if (optionsJson != null && !optionsJson.isEmpty()) {
+      // Parse JSON and convert to map syntax for Iceberg procedure
+      Map<String, String> options = parseOptionsJson(optionsJson);
+      if (!options.isEmpty()) {
+        sql.append(", options => map(");
+        boolean first = true;
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+          if (!first) {
+            sql.append(", ");
+          }
+          sql.append("'")
+              .append(entry.getKey())
+              .append("', '")
+              .append(entry.getValue())
+              .append("'");
+          first = false;
+        }
+        sql.append(")");
+      }
+    }
+
+    sql.append(")");
+    return sql.toString();
+  }
+
+  /**
+   * Parse command line arguments in --key value format.
+   *
+   * @param args command line arguments
+   * @return map of argument names to values
+   */
+  static Map<String, String> parseArguments(String[] args) {
+    Map<String, String> argMap = new HashMap<>();
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].startsWith("--")) {
+        String key = args[i].substring(2); // Remove "--" prefix
+
+        // Check if there's a value for this key
+        if (i + 1 < args.length && !args[i + 1].startsWith("--")) {
+          String value = args[i + 1];
+          // Only add non-empty values
+          if (value != null && !value.trim().isEmpty()) {
+            argMap.put(key, value);
+          }
+          i++; // Skip the value in next iteration
+        } else {
+          System.err.println("Warning: Flag " + args[i] + " has no value, 
ignoring");
+        }
+      }
+    }
+
+    return argMap;
+  }

Review Comment:
   The parseArguments method modifies the loop counter 'i' inside the loop 
body, which can be error-prone and difficult to maintain. This pattern makes 
the control flow less clear and could lead to subtle bugs if the code is 
modified later. Consider using a while loop with explicit index management or 
extracting the argument pair parsing logic into a separate method to improve 
clarity and maintainability.



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");

Review Comment:
   The main method does not validate the strategy parameter value. According to 
the documentation, valid values are 'binpack', 'sort', and 'z-order', but the 
code accepts any string value. If an invalid strategy is provided, the error 
will only be discovered at runtime when Iceberg's stored procedure is called, 
resulting in a less helpful error message. Consider adding input validation to 
check that the strategy, if provided, is one of the valid values and provide a 
clear error message if it's not.



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);
+
+      // Execute the procedure
+      Row[] results = (Row[]) spark.sql(sql).collect();
+
+      // Print results
+      if (results.length > 0) {
+        Row result = results[0];
+        // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+        if (result.size() >= 5) {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n"
+                  + "  Removed delete files: %d%n",
+              result.getInt(0),
+              result.getInt(1),
+              result.getLong(2),
+              result.getInt(3),
+              result.getInt(4));
+        } else {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n",
+              result.getInt(0), result.getInt(1), result.getLong(2), 
result.getInt(3));
+        }
+      }
+
+      System.out.println("Rewrite data files job completed successfully");
+    } catch (Exception e) {
+      System.err.println("Error executing rewrite data files job: " + 
e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  /**
+   * Build the SQL CALL statement for the rewrite_data_files procedure.
+   *
+   * @param catalogName Iceberg catalog name
+   * @param tableIdentifier Fully qualified table name
+   * @param strategy Rewrite strategy (binpack or sort)
+   * @param sortOrder Sort order specification
+   * @param whereClause Filter predicate
+   * @param optionsJson JSON map of options
+   * @return SQL CALL statement
+   */
+  static String buildProcedureCall(
+      String catalogName,
+      String tableIdentifier,
+      String strategy,
+      String sortOrder,
+      String whereClause,
+      String optionsJson) {
+
+    StringBuilder sql = new StringBuilder();
+    sql.append("CALL 
").append(catalogName).append(".system.rewrite_data_files(");
+    sql.append("table => '").append(tableIdentifier).append("'");
+
+    if (strategy != null && !strategy.isEmpty()) {
+      sql.append(", strategy => '").append(strategy).append("'");
+    }
+
+    if (sortOrder != null && !sortOrder.isEmpty()) {
+      sql.append(", sort_order => '").append(sortOrder).append("'");
+    }
+
+    if (whereClause != null && !whereClause.isEmpty()) {
+      sql.append(", where => '").append(whereClause).append("'");
+    }
+
+    if (optionsJson != null && !optionsJson.isEmpty()) {
+      // Parse JSON and convert to map syntax for Iceberg procedure
+      Map<String, String> options = parseOptionsJson(optionsJson);
+      if (!options.isEmpty()) {
+        sql.append(", options => map(");
+        boolean first = true;
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+          if (!first) {
+            sql.append(", ");
+          }
+          sql.append("'")
+              .append(entry.getKey())
+              .append("', '")
+              .append(entry.getValue())
+              .append("'");
+          first = false;
+        }
+        sql.append(")");
+      }
+    }
+
+    sql.append(")");
+    return sql.toString();
+  }
+
+  /**
+   * Parse command line arguments in --key value format.
+   *
+   * @param args command line arguments
+   * @return map of argument names to values
+   */
+  static Map<String, String> parseArguments(String[] args) {
+    Map<String, String> argMap = new HashMap<>();
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].startsWith("--")) {
+        String key = args[i].substring(2); // Remove "--" prefix
+
+        // Check if there's a value for this key
+        if (i + 1 < args.length && !args[i + 1].startsWith("--")) {
+          String value = args[i + 1];
+          // Only add non-empty values
+          if (value != null && !value.trim().isEmpty()) {
+            argMap.put(key, value);
+          }
+          i++; // Skip the value in next iteration
+        } else {
+          System.err.println("Warning: Flag " + args[i] + " has no value, 
ignoring");
+        }
+      }
+    }
+
+    return argMap;
+  }
+
+  /** Print usage information. */
+  private static void printUsage() {
+    System.err.println(
+        "Usage: IcebergRewriteDataFilesJob [OPTIONS]\n"
+            + "\n"
+            + "Required Options:\n"
+            + "  --catalog <name>          Iceberg catalog name registered in 
Spark\n"
+            + "  --table <identifier>      Fully qualified table name (e.g., 
db.table_name)\n"
+            + "\n"
+            + "Optional Options:\n"
+            + "  --strategy <name>         Rewrite strategy: binpack (default) 
or sort\n"
+            + "  --sort-order <spec>       Sort order specification:\n"
+            + "                              For columns: 'id DESC NULLS LAST, 
name ASC'\n"
+            + "                              For Z-Order: 'zorder(c1,c2,c3)'\n"
+            + "  --where <predicate>       Filter predicate to select files\n"
+            + "                              Example: 'year = 2024 and month = 
1'\n"
+            + "  --options <json>          JSON map of Iceberg rewrite 
options\n"
+            + "                              Example: 
'{\"min-input-files\":\"2\"}'\n"
+            + "\n"
+            + "Examples:\n"
+            + "  # Basic binpack\n"
+            + "  --catalog iceberg_prod --table db.sample\n"
+            + "\n"
+            + "  # Sort by columns\n"
+            + "  --catalog iceberg_prod --table db.sample --strategy sort \\\n"
+            + "    --sort-order 'id DESC NULLS LAST'\n"
+            + "\n"
+            + "  # With filter and options\n"
+            + "  --catalog iceberg_prod --table db.sample --where 'year = 
2024' \\\n"
+            + "    --options 
'{\"min-input-files\":\"2\",\"remove-dangling-deletes\":\"true\"}'");
+  }
+
+  /**
+   * Parse options from JSON string. Expected format: {"key1": "value1", 
"key2": "value2"}
+   *
+   * @param optionsJson JSON string
+   * @return map of option keys to values
+   */
+  static Map<String, String> parseOptionsJson(String optionsJson) {
+    Map<String, String> options = new HashMap<>();
+    if (optionsJson == null || optionsJson.isEmpty()) {
+      return options;
+    }
+
+    String cleaned =
+        optionsJson.trim().replaceAll("^\\{", "").replaceAll("\\}$", 
"").replaceAll("\"", "");
+    if (cleaned.isEmpty()) {
+      return options;
+    }
+
+    String[] pairs = cleaned.split(",");
+    for (String pair : pairs) {
+      String[] keyValue = pair.split(":", 2);
+      if (keyValue.length == 2) {
+        options.put(keyValue[0].trim(), keyValue[1].trim());
+      }
+    }
+    return options;
+  }

Review Comment:
   The parseOptionsJson method uses a simplistic string parsing approach that 
is fragile and cannot handle JSON values containing colons or commas. For 
example, a value like "path:/data/file,backup" would be incorrectly parsed. 
Additionally, the regex pattern replaceAll("\"", "") would fail on JSON strings 
that contain escaped quotes. Consider using a proper JSON parser library like 
Jackson or Gson to safely and correctly parse the JSON options.



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);
+
+      // Execute the procedure
+      Row[] results = (Row[]) spark.sql(sql).collect();
+
+      // Print results
+      if (results.length > 0) {
+        Row result = results[0];
+        // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+        if (result.size() >= 5) {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n"
+                  + "  Removed delete files: %d%n",
+              result.getInt(0),
+              result.getInt(1),
+              result.getLong(2),
+              result.getInt(3),
+              result.getInt(4));
+        } else {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n",
+              result.getInt(0), result.getInt(1), result.getLong(2), 
result.getInt(3));
+        }
+      }
+
+      System.out.println("Rewrite data files job completed successfully");
+    } catch (Exception e) {
+      System.err.println("Error executing rewrite data files job: " + 
e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  /**
+   * Build the SQL CALL statement for the rewrite_data_files procedure.
+   *
+   * @param catalogName Iceberg catalog name
+   * @param tableIdentifier Fully qualified table name
+   * @param strategy Rewrite strategy (binpack or sort)
+   * @param sortOrder Sort order specification
+   * @param whereClause Filter predicate
+   * @param optionsJson JSON map of options
+   * @return SQL CALL statement
+   */
+  static String buildProcedureCall(
+      String catalogName,
+      String tableIdentifier,
+      String strategy,
+      String sortOrder,
+      String whereClause,
+      String optionsJson) {
+
+    StringBuilder sql = new StringBuilder();
+    sql.append("CALL 
").append(catalogName).append(".system.rewrite_data_files(");
+    sql.append("table => '").append(tableIdentifier).append("'");
+
+    if (strategy != null && !strategy.isEmpty()) {
+      sql.append(", strategy => '").append(strategy).append("'");
+    }
+
+    if (sortOrder != null && !sortOrder.isEmpty()) {
+      sql.append(", sort_order => '").append(sortOrder).append("'");
+    }
+
+    if (whereClause != null && !whereClause.isEmpty()) {
+      sql.append(", where => '").append(whereClause).append("'");
+    }
+
+    if (optionsJson != null && !optionsJson.isEmpty()) {
+      // Parse JSON and convert to map syntax for Iceberg procedure
+      Map<String, String> options = parseOptionsJson(optionsJson);
+      if (!options.isEmpty()) {
+        sql.append(", options => map(");
+        boolean first = true;
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+          if (!first) {
+            sql.append(", ");
+          }
+          sql.append("'")
+              .append(entry.getKey())
+              .append("', '")
+              .append(entry.getValue())
+              .append("'");
+          first = false;
+        }
+        sql.append(")");
+      }
+    }
+
+    sql.append(")");
+    return sql.toString();
+  }

Review Comment:
   The buildProcedureCall method directly concatenates user-provided input 
(catalogName, tableIdentifier, strategy, sortOrder, whereClause) into SQL 
without proper escaping or validation. This creates a potential SQL injection 
vulnerability. User input containing single quotes or SQL metacharacters could 
break out of the string literals and inject malicious SQL code. Consider using 
parameterized queries or properly escaping single quotes by replacing them with 
doubled single quotes ('') in all string values before concatenation.



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+

Review Comment:
   The argument length check 'args.length < 4' assumes that at minimum there 
will be 4 arguments (--catalog value --table value), but this validation is 
redundant since the subsequent null checks on catalogName and tableIdentifier 
already handle missing required arguments. Additionally, the hardcoded number 4 
is a magic number that reduces code maintainability. Consider removing this 
check and relying solely on the null validation, or add a constant that 
documents why 4 is the minimum expected length.
   ```suggestion
   
   ```



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);

Review Comment:
   The main method logs the complete SQL statement including user input to 
System.out on line 106. If the SQL contains sensitive information in the where 
clause or options (e.g., filtering by user IDs, email addresses, or 
configuration containing credentials), this could lead to unintended 
information disclosure in log files. Consider logging only non-sensitive parts 
of the query or sanitizing the output, or documenting that sensitive data 
should not be passed through these parameters.
   ```suggestion
         System.out.println(
             "Executing Iceberg rewrite_data_files procedure for catalog '"
                 + catalogName
                 + "', table '"
                 + tableIdentifier
                 + "'");
   ```



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'
+   */
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      printUsage();
+      System.exit(1);
+    }
+
+    // Parse named arguments
+    Map<String, String> argMap = parseArguments(args);
+
+    // Validate required arguments
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    // Optional arguments
+    String strategy = argMap.get("strategy");
+    String sortOrder = argMap.get("sort-order");
+    String whereClause = argMap.get("where");
+    String optionsJson = argMap.get("options");
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in Iceberg Rewrite Data Files")
+            .getOrCreate();
+
+    try {
+      // Build the procedure call SQL
+      String sql =
+          buildProcedureCall(
+              catalogName, tableIdentifier, strategy, sortOrder, whereClause, 
optionsJson);
+
+      System.out.println("Executing Iceberg rewrite_data_files procedure: " + 
sql);
+
+      // Execute the procedure
+      Row[] results = (Row[]) spark.sql(sql).collect();
+
+      // Print results
+      if (results.length > 0) {
+        Row result = results[0];
+        // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+        if (result.size() >= 5) {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n"
+                  + "  Removed delete files: %d%n",
+              result.getInt(0),
+              result.getInt(1),
+              result.getLong(2),
+              result.getInt(3),
+              result.getInt(4));
+        } else {
+          System.out.printf(
+              "Rewrite Data Files Results:%n"
+                  + "  Rewritten data files: %d%n"
+                  + "  Added data files: %d%n"
+                  + "  Rewritten bytes: %d%n"
+                  + "  Failed data files: %d%n",
+              result.getInt(0), result.getInt(1), result.getLong(2), 
result.getInt(3));
+        }
+      }
+
+      System.out.println("Rewrite data files job completed successfully");
+    } catch (Exception e) {
+      System.err.println("Error executing rewrite data files job: " + 
e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } finally {
+      spark.stop();
+    }
+  }
+
+  /**
+   * Build the SQL CALL statement for the rewrite_data_files procedure.
+   *
+   * @param catalogName Iceberg catalog name
+   * @param tableIdentifier Fully qualified table name
+   * @param strategy Rewrite strategy (binpack or sort)
+   * @param sortOrder Sort order specification
+   * @param whereClause Filter predicate
+   * @param optionsJson JSON map of options
+   * @return SQL CALL statement
+   */

Review Comment:
   The Javadoc for buildProcedureCall states the strategy parameter should be 
'binpack or sort', but based on the PR description and implementation, 
'z-order' should also be mentioned as a valid strategy (achieved through 'sort' 
with zorder function). The documentation is incomplete and inconsistent with 
the actual functionality. Update the Javadoc to accurately document all 
supported strategy values.



##########
maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data 
file layout through
+ * binpack, sort, or zorder strategies.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in Iceberg rewrite data files job template for 
table optimization")
+        .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+        .withClassName(IcebergRewriteDataFilesJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /**
+   * Main entry point for the rewrite data files job.
+   *
+   * <p>Uses named arguments for flexibility: --catalog &lt;catalog_name&gt; 
Required. Iceberg
+   * catalog name --table &lt;table_identifier&gt; Required. Table name 
(db.table) --strategy
+   * &lt;strategy&gt; Optional. binpack or sort --sort-order 
&lt;sort_order&gt; Optional. Sort order
+   * specification --where &lt;where_clause&gt; Optional. Filter predicate 
--options
+   * &lt;options_json&gt; Optional. JSON map of options
+   *
+   * <p>Example: --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
+   * '{"min-input-files":"2"}'

Review Comment:
   The Javadoc comment states the strategy can be 'binpack or sort' but the PR 
description and usage examples mention support for 'z-order' as well. The 
documentation is inconsistent about whether 'z-order' is a separate strategy or 
if it's achieved using 'sort' strategy with 'zorder(...)' in the sort-order 
parameter. This inconsistency could confuse users. Consider clarifying the 
documentation to accurately reflect the supported strategies and how z-order is 
specified.
   ```suggestion
      * <p>Uses named arguments for flexibility:
      * --catalog &lt;catalog_name&gt; Required. Iceberg catalog name
      * --table &lt;table_identifier&gt; Required. Table name (db.table)
      * --strategy &lt;strategy&gt; Optional. Rewrite strategy: {@code binpack} 
or {@code sort}. Z-order
      *     layouts are achieved by using {@code sort} together with a {@code 
zorder(...)} expression
      *     in {@code --sort-order}; {@code zorder} is not a separate strategy 
value.
      * --sort-order &lt;sort_order&gt; Optional. Sort or z-order specification 
passed to Iceberg when
      *     using the {@code sort} strategy, for example {@code "zorder(col1, 
col2)"}.
      * --where &lt;where_clause&gt; Optional. Filter predicate
      * --options &lt;options_json&gt; Optional. JSON map of options
      *
      * <p>Examples:
      * <ul>
      *   <li>Binpack rewrite:
      *       {@code --catalog iceberg_catalog --table db.sample --strategy 
binpack --options
      *       '{"min-input-files":"2"}'}
      *   <li>Z-order rewrite using sort strategy:
      *       {@code --catalog iceberg_catalog --table db.sample --strategy 
sort --sort-order
      *       "zorder(col1, col2)"}
      * </ul>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to