rangareddy commented on code in PR #659: URL: https://github.com/apache/incubator-xtable/pull/659#discussion_r1977488272
########## xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java: ########## @@ -98,37 +98,110 @@ public class RunSync { + "used for any Iceberg source or target.") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); - public static void main(String[] args) throws IOException { - CommandLineParser parser = new DefaultParser(); + public static SourceTable sourceTableBuilder( + DatasetConfig.Table table, + IcebergCatalogConfig icebergCatalogConfig, + DatasetConfig datasetConfig, + Properties sourceProperties) { + SourceTable sourceTable = + SourceTable.builder() + .name(table.getTableName()) + .basePath(table.getTableBasePath()) + .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\.")) + .dataPath(table.getTableDataPath()) + .catalogConfig(icebergCatalogConfig) + .additionalProperties(sourceProperties) + .formatName(datasetConfig.sourceFormat) + .build(); + return sourceTable; + } - CommandLine cmd; - try { - cmd = parser.parse(OPTIONS, args); - } catch (ParseException e) { - new HelpFormatter().printHelp("xtable.jar", OPTIONS, true); - return; - } + public static List<TargetTable> targetTableBuilder( + DatasetConfig.Table table, + IcebergCatalogConfig icebergCatalogConfig, + List<String> tableFormatList) { + List<TargetTable> targetTables = + tableFormatList.stream() + .map( + tableFormat -> + TargetTable.builder() + .name(table.getTableName()) + .basePath(table.getTableBasePath()) + .namespace( + table.getNamespace() == null ? null : table.getNamespace().split("\\.")) + .catalogConfig(icebergCatalogConfig) + .formatName(tableFormat) + .build()) + .collect(Collectors.toList()); + return targetTables; + } - if (cmd.hasOption(HELP_OPTION)) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("RunSync", OPTIONS); - return; + public static void formatConvertor( + DatasetConfig datasetConfig, + List<String> tableFormatList, + IcebergCatalogConfig icebergCatalogConfig, + Configuration hadoopConf, + ConversionSourceProvider conversionSourceProvider) { + for (DatasetConfig.Table table : datasetConfig.getDatasets()) { + log.info( + "Running sync for basePath {} for following table formats {}", + table.getTableBasePath(), + tableFormatList); + Properties sourceProperties = new Properties(); + if (table.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + } + + ConversionController conversionController = new ConversionController(hadoopConf); + SourceTable sourceTable = + sourceTableBuilder(table, icebergCatalogConfig, datasetConfig, sourceProperties); + List<TargetTable> targetTables = + targetTableBuilder(table, icebergCatalogConfig, tableFormatList); + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(SyncMode.INCREMENTAL) + .build(); + try { + conversionController.sync(conversionConfig, conversionSourceProvider); + } catch (Exception e) { + log.error(String.format("Error running sync for %s", table.getTableBasePath()), e); + } } + } + public static DatasetConfig getDatasetConfig(String d) throws IOException { + // Initialize DatasetConfig DatasetConfig datasetConfig = new DatasetConfig(); - try (InputStream inputStream = - Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) { + + try (InputStream inputStream = Files.newInputStream(Paths.get(d))) { ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig); objectReader.readValue(inputStream); } + return datasetConfig; + } - byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH); + public static Configuration gethadoopConf(String cmd) throws IOException { + // Load configurations + byte[] customConfig = getCustomConfigurations(cmd); Configuration hadoopConf = loadHadoopConf(customConfig); - byte[] icebergCatalogConfigInput = getCustomConfigurations(cmd, ICEBERG_CATALOG_CONFIG_PATH); + return hadoopConf; + } + + public static IcebergCatalogConfig getIcebergCatalogConfig(String cmd) throws IOException { Review Comment: Change the variable name from cmd to variablename for example icebergCatalogConfigPath -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@xtable.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org