Copilot commented on code in PR #9588: URL: https://github.com/apache/gravitino/pull/9588#discussion_r2660507166
########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); + + // Execute the procedure + Row[] results = (Row[]) spark.sql(sql).collect(); + + // Print results + if (results.length > 0) { + Row result = results[0]; + // Iceberg 1.6.1 returns 4 columns, newer versions may return 5 + if (result.size() >= 5) { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n" + + " Removed delete files: %d%n", + result.getInt(0), + result.getInt(1), + result.getLong(2), + result.getInt(3), + result.getInt(4)); + } else { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n", + result.getInt(0), result.getInt(1), result.getLong(2), result.getInt(3)); + } + } + + System.out.println("Rewrite data files job completed successfully"); + } catch (Exception e) { + System.err.println("Error executing rewrite data files job: " + e.getMessage()); Review Comment: The exception handler catches generic Exception and prints the error message with e.getMessage(), which may return null for some exception types. This could result in printing "Error executing rewrite data files job: null" which is not helpful for debugging. Consider checking if the message is null and providing more context, or using a more specific exception type, or always including the exception class name in the error output. ```suggestion String errorMessage = e.getMessage() != null ? e.getMessage() : "no error message available"; System.err.println( "Error executing rewrite data files job (" + e.getClass().getName() + "): " + errorMessage); ``` ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); + + // Execute the procedure + Row[] results = (Row[]) spark.sql(sql).collect(); + + // Print results + if (results.length > 0) { + Row result = results[0]; + // Iceberg 1.6.1 returns 4 columns, newer versions may return 5 + if (result.size() >= 5) { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n" + + " Removed delete files: %d%n", + result.getInt(0), + result.getInt(1), + result.getLong(2), + result.getInt(3), + result.getInt(4)); + } else { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n", + result.getInt(0), result.getInt(1), result.getLong(2), result.getInt(3)); + } + } + + System.out.println("Rewrite data files job completed successfully"); + } catch (Exception e) { + System.err.println("Error executing rewrite data files job: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } + + /** + * Build the SQL CALL statement for the rewrite_data_files procedure. + * + * @param catalogName Iceberg catalog name + * @param tableIdentifier Fully qualified table name + * @param strategy Rewrite strategy (binpack or sort) + * @param sortOrder Sort order specification + * @param whereClause Filter predicate + * @param optionsJson JSON map of options + * @return SQL CALL statement + */ + static String buildProcedureCall( + String catalogName, + String tableIdentifier, + String strategy, + String sortOrder, + String whereClause, + String optionsJson) { + + StringBuilder sql = new StringBuilder(); + sql.append("CALL ").append(catalogName).append(".system.rewrite_data_files("); + sql.append("table => '").append(tableIdentifier).append("'"); + + if (strategy != null && !strategy.isEmpty()) { + sql.append(", strategy => '").append(strategy).append("'"); + } + + if (sortOrder != null && !sortOrder.isEmpty()) { + sql.append(", sort_order => '").append(sortOrder).append("'"); + } + + if (whereClause != null && !whereClause.isEmpty()) { + sql.append(", where => '").append(whereClause).append("'"); + } + + if (optionsJson != null && !optionsJson.isEmpty()) { + // Parse JSON and convert to map syntax for Iceberg procedure + Map<String, String> options = parseOptionsJson(optionsJson); + if (!options.isEmpty()) { + sql.append(", options => map("); + boolean first = true; + for (Map.Entry<String, String> entry : options.entrySet()) { + if (!first) { + sql.append(", "); + } + sql.append("'") + .append(entry.getKey()) + .append("', '") + .append(entry.getValue()) + .append("'"); + first = false; + } + sql.append(")"); + } + } + + sql.append(")"); + return sql.toString(); + } + + /** + * Parse command line arguments in --key value format. + * + * @param args command line arguments + * @return map of argument names to values + */ + static Map<String, String> parseArguments(String[] args) { + Map<String, String> argMap = new HashMap<>(); + + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("--")) { + String key = args[i].substring(2); // Remove "--" prefix + + // Check if there's a value for this key + if (i + 1 < args.length && !args[i + 1].startsWith("--")) { + String value = args[i + 1]; + // Only add non-empty values + if (value != null && !value.trim().isEmpty()) { + argMap.put(key, value); + } + i++; // Skip the value in next iteration + } else { + System.err.println("Warning: Flag " + args[i] + " has no value, ignoring"); + } + } + } + + return argMap; + } Review Comment: The parseArguments method modifies the loop counter 'i' inside the loop body, which can be error-prone and difficult to maintain. This pattern makes the control flow less clear and could lead to subtle bugs if the code is modified later. Consider using a while loop with explicit index management or extracting the argument pair parsing logic into a separate method to improve clarity and maintainability. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); Review Comment: The main method does not validate the strategy parameter value. According to the documentation, valid values are 'binpack', 'sort', and 'z-order', but the code accepts any string value. If an invalid strategy is provided, the error will only be discovered at runtime when Iceberg's stored procedure is called, resulting in a less helpful error message. Consider adding input validation to check that the strategy, if provided, is one of the valid values and provide a clear error message if it's not. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); + + // Execute the procedure + Row[] results = (Row[]) spark.sql(sql).collect(); + + // Print results + if (results.length > 0) { + Row result = results[0]; + // Iceberg 1.6.1 returns 4 columns, newer versions may return 5 + if (result.size() >= 5) { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n" + + " Removed delete files: %d%n", + result.getInt(0), + result.getInt(1), + result.getLong(2), + result.getInt(3), + result.getInt(4)); + } else { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n", + result.getInt(0), result.getInt(1), result.getLong(2), result.getInt(3)); + } + } + + System.out.println("Rewrite data files job completed successfully"); + } catch (Exception e) { + System.err.println("Error executing rewrite data files job: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } + + /** + * Build the SQL CALL statement for the rewrite_data_files procedure. + * + * @param catalogName Iceberg catalog name + * @param tableIdentifier Fully qualified table name + * @param strategy Rewrite strategy (binpack or sort) + * @param sortOrder Sort order specification + * @param whereClause Filter predicate + * @param optionsJson JSON map of options + * @return SQL CALL statement + */ + static String buildProcedureCall( + String catalogName, + String tableIdentifier, + String strategy, + String sortOrder, + String whereClause, + String optionsJson) { + + StringBuilder sql = new StringBuilder(); + sql.append("CALL ").append(catalogName).append(".system.rewrite_data_files("); + sql.append("table => '").append(tableIdentifier).append("'"); + + if (strategy != null && !strategy.isEmpty()) { + sql.append(", strategy => '").append(strategy).append("'"); + } + + if (sortOrder != null && !sortOrder.isEmpty()) { + sql.append(", sort_order => '").append(sortOrder).append("'"); + } + + if (whereClause != null && !whereClause.isEmpty()) { + sql.append(", where => '").append(whereClause).append("'"); + } + + if (optionsJson != null && !optionsJson.isEmpty()) { + // Parse JSON and convert to map syntax for Iceberg procedure + Map<String, String> options = parseOptionsJson(optionsJson); + if (!options.isEmpty()) { + sql.append(", options => map("); + boolean first = true; + for (Map.Entry<String, String> entry : options.entrySet()) { + if (!first) { + sql.append(", "); + } + sql.append("'") + .append(entry.getKey()) + .append("', '") + .append(entry.getValue()) + .append("'"); + first = false; + } + sql.append(")"); + } + } + + sql.append(")"); + return sql.toString(); + } + + /** + * Parse command line arguments in --key value format. + * + * @param args command line arguments + * @return map of argument names to values + */ + static Map<String, String> parseArguments(String[] args) { + Map<String, String> argMap = new HashMap<>(); + + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("--")) { + String key = args[i].substring(2); // Remove "--" prefix + + // Check if there's a value for this key + if (i + 1 < args.length && !args[i + 1].startsWith("--")) { + String value = args[i + 1]; + // Only add non-empty values + if (value != null && !value.trim().isEmpty()) { + argMap.put(key, value); + } + i++; // Skip the value in next iteration + } else { + System.err.println("Warning: Flag " + args[i] + " has no value, ignoring"); + } + } + } + + return argMap; + } + + /** Print usage information. */ + private static void printUsage() { + System.err.println( + "Usage: IcebergRewriteDataFilesJob [OPTIONS]\n" + + "\n" + + "Required Options:\n" + + " --catalog <name> Iceberg catalog name registered in Spark\n" + + " --table <identifier> Fully qualified table name (e.g., db.table_name)\n" + + "\n" + + "Optional Options:\n" + + " --strategy <name> Rewrite strategy: binpack (default) or sort\n" + + " --sort-order <spec> Sort order specification:\n" + + " For columns: 'id DESC NULLS LAST, name ASC'\n" + + " For Z-Order: 'zorder(c1,c2,c3)'\n" + + " --where <predicate> Filter predicate to select files\n" + + " Example: 'year = 2024 and month = 1'\n" + + " --options <json> JSON map of Iceberg rewrite options\n" + + " Example: '{\"min-input-files\":\"2\"}'\n" + + "\n" + + "Examples:\n" + + " # Basic binpack\n" + + " --catalog iceberg_prod --table db.sample\n" + + "\n" + + " # Sort by columns\n" + + " --catalog iceberg_prod --table db.sample --strategy sort \\\n" + + " --sort-order 'id DESC NULLS LAST'\n" + + "\n" + + " # With filter and options\n" + + " --catalog iceberg_prod --table db.sample --where 'year = 2024' \\\n" + + " --options '{\"min-input-files\":\"2\",\"remove-dangling-deletes\":\"true\"}'"); + } + + /** + * Parse options from JSON string. Expected format: {"key1": "value1", "key2": "value2"} + * + * @param optionsJson JSON string + * @return map of option keys to values + */ + static Map<String, String> parseOptionsJson(String optionsJson) { + Map<String, String> options = new HashMap<>(); + if (optionsJson == null || optionsJson.isEmpty()) { + return options; + } + + String cleaned = + optionsJson.trim().replaceAll("^\\{", "").replaceAll("\\}$", "").replaceAll("\"", ""); + if (cleaned.isEmpty()) { + return options; + } + + String[] pairs = cleaned.split(","); + for (String pair : pairs) { + String[] keyValue = pair.split(":", 2); + if (keyValue.length == 2) { + options.put(keyValue[0].trim(), keyValue[1].trim()); + } + } + return options; + } Review Comment: The parseOptionsJson method uses a simplistic string parsing approach that is fragile and cannot handle JSON values containing colons or commas. For example, a value like "path:/data/file,backup" would be incorrectly parsed. Additionally, the regex pattern replaceAll("\"", "") would fail on JSON strings that contain escaped quotes. Consider using a proper JSON parser library like Jackson or Gson to safely and correctly parse the JSON options. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); + + // Execute the procedure + Row[] results = (Row[]) spark.sql(sql).collect(); + + // Print results + if (results.length > 0) { + Row result = results[0]; + // Iceberg 1.6.1 returns 4 columns, newer versions may return 5 + if (result.size() >= 5) { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n" + + " Removed delete files: %d%n", + result.getInt(0), + result.getInt(1), + result.getLong(2), + result.getInt(3), + result.getInt(4)); + } else { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n", + result.getInt(0), result.getInt(1), result.getLong(2), result.getInt(3)); + } + } + + System.out.println("Rewrite data files job completed successfully"); + } catch (Exception e) { + System.err.println("Error executing rewrite data files job: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } + + /** + * Build the SQL CALL statement for the rewrite_data_files procedure. + * + * @param catalogName Iceberg catalog name + * @param tableIdentifier Fully qualified table name + * @param strategy Rewrite strategy (binpack or sort) + * @param sortOrder Sort order specification + * @param whereClause Filter predicate + * @param optionsJson JSON map of options + * @return SQL CALL statement + */ + static String buildProcedureCall( + String catalogName, + String tableIdentifier, + String strategy, + String sortOrder, + String whereClause, + String optionsJson) { + + StringBuilder sql = new StringBuilder(); + sql.append("CALL ").append(catalogName).append(".system.rewrite_data_files("); + sql.append("table => '").append(tableIdentifier).append("'"); + + if (strategy != null && !strategy.isEmpty()) { + sql.append(", strategy => '").append(strategy).append("'"); + } + + if (sortOrder != null && !sortOrder.isEmpty()) { + sql.append(", sort_order => '").append(sortOrder).append("'"); + } + + if (whereClause != null && !whereClause.isEmpty()) { + sql.append(", where => '").append(whereClause).append("'"); + } + + if (optionsJson != null && !optionsJson.isEmpty()) { + // Parse JSON and convert to map syntax for Iceberg procedure + Map<String, String> options = parseOptionsJson(optionsJson); + if (!options.isEmpty()) { + sql.append(", options => map("); + boolean first = true; + for (Map.Entry<String, String> entry : options.entrySet()) { + if (!first) { + sql.append(", "); + } + sql.append("'") + .append(entry.getKey()) + .append("', '") + .append(entry.getValue()) + .append("'"); + first = false; + } + sql.append(")"); + } + } + + sql.append(")"); + return sql.toString(); + } Review Comment: The buildProcedureCall method directly concatenates user-provided input (catalogName, tableIdentifier, strategy, sortOrder, whereClause) into SQL without proper escaping or validation. This creates a potential SQL injection vulnerability. User input containing single quotes or SQL metacharacters could break out of the string literals and inject malicious SQL code. Consider using parameterized queries or properly escaping single quotes by replacing them with doubled single quotes ('') in all string values before concatenation. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + Review Comment: The argument length check 'args.length < 4' assumes that at minimum there will be 4 arguments (--catalog value --table value), but this validation is redundant since the subsequent null checks on catalogName and tableIdentifier already handle missing required arguments. Additionally, the hardcoded number 4 is a magic number that reduces code maintainability. Consider removing this check and relying solely on the null validation, or add a constant that documents why 4 is the minimum expected length. ```suggestion ``` ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); Review Comment: The main method logs the complete SQL statement including user input to System.out on line 106. If the SQL contains sensitive information in the where clause or options (e.g., filtering by user IDs, email addresses, or configuration containing credentials), this could lead to unintended information disclosure in log files. Consider logging only non-sensitive parts of the query or sanitizing the output, or documenting that sensitive data should not be passed through these parameters. ```suggestion System.out.println( "Executing Iceberg rewrite_data_files procedure for catalog '" + catalogName + "', table '" + tableIdentifier + "'"); ``` ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' + */ + public static void main(String[] args) { + if (args.length < 4) { + printUsage(); + System.exit(1); + } + + // Parse named arguments + Map<String, String> argMap = parseArguments(args); + + // Validate required arguments + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); + printUsage(); + System.exit(1); + } + + // Optional arguments + String strategy = argMap.get("strategy"); + String sortOrder = argMap.get("sort-order"); + String whereClause = argMap.get("where"); + String optionsJson = argMap.get("options"); + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in Iceberg Rewrite Data Files") + .getOrCreate(); + + try { + // Build the procedure call SQL + String sql = + buildProcedureCall( + catalogName, tableIdentifier, strategy, sortOrder, whereClause, optionsJson); + + System.out.println("Executing Iceberg rewrite_data_files procedure: " + sql); + + // Execute the procedure + Row[] results = (Row[]) spark.sql(sql).collect(); + + // Print results + if (results.length > 0) { + Row result = results[0]; + // Iceberg 1.6.1 returns 4 columns, newer versions may return 5 + if (result.size() >= 5) { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n" + + " Removed delete files: %d%n", + result.getInt(0), + result.getInt(1), + result.getLong(2), + result.getInt(3), + result.getInt(4)); + } else { + System.out.printf( + "Rewrite Data Files Results:%n" + + " Rewritten data files: %d%n" + + " Added data files: %d%n" + + " Rewritten bytes: %d%n" + + " Failed data files: %d%n", + result.getInt(0), result.getInt(1), result.getLong(2), result.getInt(3)); + } + } + + System.out.println("Rewrite data files job completed successfully"); + } catch (Exception e) { + System.err.println("Error executing rewrite data files job: " + e.getMessage()); + e.printStackTrace(); + System.exit(1); + } finally { + spark.stop(); + } + } + + /** + * Build the SQL CALL statement for the rewrite_data_files procedure. + * + * @param catalogName Iceberg catalog name + * @param tableIdentifier Fully qualified table name + * @param strategy Rewrite strategy (binpack or sort) + * @param sortOrder Sort order specification + * @param whereClause Filter predicate + * @param optionsJson JSON map of options + * @return SQL CALL statement + */ Review Comment: The Javadoc for buildProcedureCall states the strategy parameter should be 'binpack or sort', but based on the PR description and implementation, 'z-order' should also be mentioned as a valid strategy (achieved through 'sort' with zorder function). The documentation is incomplete and inconsistent with the actual functionality. Update the Javadoc to accurately document all supported strategy values. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Built-in job for rewriting Iceberg table data files. + * + * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data file layout through + * binpack, sort, or zorder strategies. + */ +public class IcebergRewriteDataFilesJob implements BuiltInJob { + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files"; + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in Iceberg rewrite data files job template for table optimization") + .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class)) + .withClassName(IcebergRewriteDataFilesJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** + * Main entry point for the rewrite data files job. + * + * <p>Uses named arguments for flexibility: --catalog <catalog_name> Required. Iceberg + * catalog name --table <table_identifier> Required. Table name (db.table) --strategy + * <strategy> Optional. binpack or sort --sort-order <sort_order> Optional. Sort order + * specification --where <where_clause> Optional. Filter predicate --options + * <options_json> Optional. JSON map of options + * + * <p>Example: --catalog iceberg_catalog --table db.sample --strategy binpack --options + * '{"min-input-files":"2"}' Review Comment: The Javadoc comment states the strategy can be 'binpack or sort' but the PR description and usage examples mention support for 'z-order' as well. The documentation is inconsistent about whether 'z-order' is a separate strategy or if it's achieved using 'sort' strategy with 'zorder(...)' in the sort-order parameter. This inconsistency could confuse users. Consider clarifying the documentation to accurately reflect the supported strategies and how z-order is specified. ```suggestion * <p>Uses named arguments for flexibility: * --catalog <catalog_name> Required. Iceberg catalog name * --table <table_identifier> Required. Table name (db.table) * --strategy <strategy> Optional. Rewrite strategy: {@code binpack} or {@code sort}. Z-order * layouts are achieved by using {@code sort} together with a {@code zorder(...)} expression * in {@code --sort-order}; {@code zorder} is not a separate strategy value. * --sort-order <sort_order> Optional. Sort or z-order specification passed to Iceberg when * using the {@code sort} strategy, for example {@code "zorder(col1, col2)"}. * --where <where_clause> Optional. Filter predicate * --options <options_json> Optional. JSON map of options * * <p>Examples: * <ul> * <li>Binpack rewrite: * {@code --catalog iceberg_catalog --table db.sample --strategy binpack --options * '{"min-input-files":"2"}'} * <li>Z-order rewrite using sort strategy: * {@code --catalog iceberg_catalog --table db.sample --strategy sort --sort-order * "zorder(col1, col2)"} * </ul> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
