the-other-tim-brown commented on code in PR #659:
URL: https://github.com/apache/incubator-xtable/pull/659#discussion_r1986364791
##########
xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java:
##########
@@ -142,63 +221,58 @@ public static void main(String[] args) throws IOException
{
ConversionSourceProvider<?> conversionSourceProvider =
ReflectionUtils.createInstanceOfClass(sourceProviderClass);
conversionSourceProvider.init(hadoopConf);
+ return conversionSourceProvider;
+ }
+ public static List<String> getTableFormatList(DatasetConfig datasetConfig)
throws IOException {
+ // Retrieve table format list
List<String> tableFormatList = datasetConfig.getTargetFormats();
- ConversionController conversionController = new
ConversionController(hadoopConf);
- for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
- log.info(
- "Running sync for basePath {} for following table formats {}",
- table.getTableBasePath(),
- tableFormatList);
- Properties sourceProperties = new Properties();
- if (table.getPartitionSpec() != null) {
- sourceProperties.put(
- HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
- }
- SourceTable sourceTable =
- SourceTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
- .dataPath(table.getTableDataPath())
- .catalogConfig(icebergCatalogConfig)
- .additionalProperties(sourceProperties)
- .formatName(sourceFormat)
- .build();
- List<TargetTable> targetTables =
- tableFormatList.stream()
- .map(
- tableFormat ->
- TargetTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(
- table.getNamespace() == null
- ? null
- : table.getNamespace().split("\\."))
- .catalogConfig(icebergCatalogConfig)
- .formatName(tableFormat)
- .build())
- .collect(Collectors.toList());
+ return tableFormatList;
+ }
- ConversionConfig conversionConfig =
- ConversionConfig.builder()
- .sourceTable(sourceTable)
- .targetTables(targetTables)
- .syncMode(SyncMode.INCREMENTAL)
- .build();
- try {
- conversionController.sync(conversionConfig, conversionSourceProvider);
- } catch (Exception e) {
- log.error("Error running sync for {}", table.getTableBasePath(), e);
- }
+ public static CommandLine CommandParser(String[] args) {
+ CommandLineParser parser = new DefaultParser();
+
+ CommandLine cmd;
+ try {
+ cmd = parser.parse(OPTIONS, args);
+ } catch (ParseException e) {
+ new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
+ return null;
+ }
+
+ if (cmd.hasOption(HELP_OPTION)) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("RunSync", OPTIONS);
+ return null;
}
+ return cmd;
+ }
+
+ public static String getValueFromConfig(CommandLine cmd, String configFlag) {
+ return cmd.getOptionValue(configFlag);
+ }
+
+ public static void main(String[] args) throws IOException {
+ CommandLine cmd = CommandParser(args);
+ String datasetConfigpath = getValueFromConfig(cmd, DATASET_CONFIG_OPTION);
+ String icebergCatalogConfigpath = getValueFromConfig(cmd,
ICEBERG_CATALOG_CONFIG_PATH);
+ String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
+ String conversionProviderConfigpath = getValueFromConfig(cmd,
CONVERTERS_CONFIG_PATH);
+ DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath);
+ IcebergCatalogConfig icebergCatalogConfig =
getIcebergCatalogConfig(icebergCatalogConfigpath);
+ Configuration hadoopConf = gethadoopConf(hadoopConfigpath);
+ ConversionSourceProvider conversionSourceProvider =
+ getConversionSourceProvider(conversionProviderConfigpath,
datasetConfig, hadoopConf);
+ List<String> tableFormatList = getTableFormatList(datasetConfig);
+ formatConvertor(
+ datasetConfig, tableFormatList, icebergCatalogConfig, hadoopConf,
conversionSourceProvider);
}
- static byte[] getCustomConfigurations(CommandLine cmd, String option) throws
IOException {
+ static byte[] getCustomConfigurations(String cmd) throws IOException {
Review Comment:
The variable name implies this is still the command line object, but I am
guessing this is now a path?
##########
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java:
##########
@@ -22,19 +22,67 @@
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.xtable.iceberg.IcebergCatalogConfig;
+import org.apache.xtable.utilities.RunSync.DatasetConfig;
import org.apache.xtable.utilities.RunSync.TableFormatConverters;
import
org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
class TestRunSync {
+ @Test
+ public void testMain() {
+ String projectRoot = System.getProperty("user.dir");
+ // Construct the path to the file in the root directory
+ int lastSlashIndex = projectRoot.lastIndexOf('/');
+ String result = projectRoot.substring(0, lastSlashIndex);
+ File file = new File(result, "/my_config.yaml");
+ String filePath = file.getPath();
+ String[] args = new String[] {"--datasetConfig", filePath};
+ try {
+ RunSync.main(args);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ @Test
+ public void testNonExistentFile() {
Review Comment:
```suggestion
public void testGetDatasetConfigWithNonExistentFile() {
```
##########
xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java:
##########
@@ -142,63 +221,58 @@ public static void main(String[] args) throws IOException
{
ConversionSourceProvider<?> conversionSourceProvider =
ReflectionUtils.createInstanceOfClass(sourceProviderClass);
conversionSourceProvider.init(hadoopConf);
+ return conversionSourceProvider;
+ }
+ public static List<String> getTableFormatList(DatasetConfig datasetConfig)
throws IOException {
+ // Retrieve table format list
List<String> tableFormatList = datasetConfig.getTargetFormats();
- ConversionController conversionController = new
ConversionController(hadoopConf);
- for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
- log.info(
- "Running sync for basePath {} for following table formats {}",
- table.getTableBasePath(),
- tableFormatList);
- Properties sourceProperties = new Properties();
- if (table.getPartitionSpec() != null) {
- sourceProperties.put(
- HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
- }
- SourceTable sourceTable =
- SourceTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
- .dataPath(table.getTableDataPath())
- .catalogConfig(icebergCatalogConfig)
- .additionalProperties(sourceProperties)
- .formatName(sourceFormat)
- .build();
- List<TargetTable> targetTables =
- tableFormatList.stream()
- .map(
- tableFormat ->
- TargetTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(
- table.getNamespace() == null
- ? null
- : table.getNamespace().split("\\."))
- .catalogConfig(icebergCatalogConfig)
- .formatName(tableFormat)
- .build())
- .collect(Collectors.toList());
+ return tableFormatList;
+ }
- ConversionConfig conversionConfig =
- ConversionConfig.builder()
- .sourceTable(sourceTable)
- .targetTables(targetTables)
- .syncMode(SyncMode.INCREMENTAL)
- .build();
- try {
- conversionController.sync(conversionConfig, conversionSourceProvider);
- } catch (Exception e) {
- log.error("Error running sync for {}", table.getTableBasePath(), e);
- }
+ public static CommandLine CommandParser(String[] args) {
Review Comment:
make sure to follow the naming convention of using a lowercase letter at the
beginning of method names
##########
xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java:
##########
@@ -98,37 +99,115 @@ public class RunSync {
+ "used for any Iceberg source or target.")
.addOption(HELP_OPTION, "help", false, "Displays help information to
run this utility");
- public static void main(String[] args) throws IOException {
- CommandLineParser parser = new DefaultParser();
+ public static SourceTable sourceTableBuilder(
+ DatasetConfig.Table table,
+ IcebergCatalogConfig icebergCatalogConfig,
+ DatasetConfig datasetConfig,
+ Properties sourceProperties) {
+ Objects.requireNonNull(table, "Table cannot be null");
+ Objects.requireNonNull(datasetConfig, "datasetConfig cannot be null");
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
+ .dataPath(table.getTableDataPath())
+ .catalogConfig(icebergCatalogConfig)
+ .additionalProperties(sourceProperties)
+ .formatName(datasetConfig.sourceFormat)
+ .build();
+ return sourceTable;
+ }
- CommandLine cmd;
- try {
- cmd = parser.parse(OPTIONS, args);
- } catch (ParseException e) {
- new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
- return;
- }
+ public static List<TargetTable> targetTableBuilder(
+ DatasetConfig.Table table,
+ IcebergCatalogConfig icebergCatalogConfig,
+ List<String> tableFormatList) {
+ Objects.requireNonNull(table, "Table cannot be null");
+ Objects.requireNonNull(tableFormatList, "tableFormatList cannot be null");
+ List<TargetTable> targetTables =
+ tableFormatList.stream()
+ .map(
+ tableFormat ->
+ TargetTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(
+ table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
+ .catalogConfig(icebergCatalogConfig)
+ .formatName(tableFormat)
+ .build())
+ .collect(Collectors.toList());
+ return targetTables;
+ }
- if (cmd.hasOption(HELP_OPTION)) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("RunSync", OPTIONS);
- return;
+ public static void formatConvertor(
+ DatasetConfig datasetConfig,
+ List<String> tableFormatList,
+ IcebergCatalogConfig icebergCatalogConfig,
+ Configuration hadoopConf,
+ ConversionSourceProvider conversionSourceProvider) {
+ ConversionController conversionController = new
ConversionController(hadoopConf);
+ for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
+ log.info(
+ "Running sync for basePath {} for following table formats {}",
+ table.getTableBasePath(),
+ tableFormatList);
+ Properties sourceProperties = new Properties();
+ if (table.getPartitionSpec() != null) {
+ sourceProperties.put(
+ HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
+ }
+
+ SourceTable sourceTable =
+ sourceTableBuilder(table, icebergCatalogConfig, datasetConfig,
sourceProperties);
+ List<TargetTable> targetTables =
+ targetTableBuilder(table, icebergCatalogConfig, tableFormatList);
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .syncMode(SyncMode.INCREMENTAL)
+ .build();
+ try {
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ } catch (Exception e) {
+ log.error(String.format("Error running sync for %s",
table.getTableBasePath()), e);
+ }
}
+ }
+ public static DatasetConfig getDatasetConfig(String datasetConfigPath)
throws IOException {
Review Comment:
For these new methods, will they be used elsewhere? Can they be made private?
##########
xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java:
##########
@@ -142,63 +221,58 @@ public static void main(String[] args) throws IOException
{
ConversionSourceProvider<?> conversionSourceProvider =
ReflectionUtils.createInstanceOfClass(sourceProviderClass);
conversionSourceProvider.init(hadoopConf);
+ return conversionSourceProvider;
+ }
+ public static List<String> getTableFormatList(DatasetConfig datasetConfig)
throws IOException {
+ // Retrieve table format list
List<String> tableFormatList = datasetConfig.getTargetFormats();
Review Comment:
Why do we need a method that is wrapping a getter?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]