This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 546edaa3 Add continous sync mode for utilities bundle, add additional 
testing
546edaa3 is described below

commit 546edaa377f56358f77e7a6395909c617caf5750
Author: Timothy Brown <[email protected]>
AuthorDate: Mon Mar 10 13:09:52 2025 -0500

    Add continous sync mode for utilities bundle, add additional testing
---
 pom.xml                                            |   2 +-
 .../org/apache/xtable/ITConversionController.java  |  29 +++++
 .../java/org/apache/xtable/utilities/RunSync.java  |  62 +++++++++-
 .../main/resources/xtable-conversion-defaults.yaml |   1 +
 .../org/apache/xtable/utilities/ITRunSync.java     | 128 +++++++++++++++++++++
 .../org/apache/xtable/utilities/TestRunSync.java   |   9 ++
 6 files changed, 224 insertions(+), 7 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3184a4be..a30a4c98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -699,7 +699,7 @@
                     <reuseForks>false</reuseForks>
                     <forkCount>6</forkCount>
                     <trimStackTrace>false</trimStackTrace>
-                    <argLine>-Xmx1024m</argLine>
+                    <argLine>-Xmx1500m</argLine>
                     
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds>
                 </configuration>
             </plugin>
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 3d539766..3325ca67 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -653,6 +653,35 @@ public class ITConversionController {
     }
   }
 
+  @Test
+  public void testIncrementalSyncsWithNoChangesDoesNotThrowError() {
+    String tableName = getTableName();
+    ConversionSourceProvider<?> conversionSourceProvider = 
getConversionSourceProvider(HUDI);
+    try (TestJavaHudiTable table =
+        TestJavaHudiTable.forStandardSchema(
+            tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+      ConversionConfig dualTableConfig =
+          getTableSyncConfig(
+              HUDI,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Arrays.asList(ICEBERG, DELTA),
+              null,
+              null);
+
+      table.insertRecords(50, true);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      // sync once
+      conversionController.sync(dualTableConfig, conversionSourceProvider);
+      checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50);
+      // sync again
+      conversionController.sync(dualTableConfig, conversionSourceProvider);
+      checkDatasetEquivalence(HUDI, table, Arrays.asList(DELTA, ICEBERG), 50);
+    }
+  }
+
   @Test
   public void testIcebergCorruptedSnapshotRecovery() throws Exception {
     String tableName = getTableName();
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 1a7bda87..8ac4e6e4 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -26,9 +26,15 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import lombok.Builder;
 import lombok.Data;
+import lombok.Value;
+import lombok.extern.jackson.Jacksonized;
 import lombok.extern.log4j.Log4j2;
 
 import org.apache.commons.cli.CommandLine;
@@ -42,7 +48,6 @@ import org.apache.hadoop.conf.Configuration;
 import com.fasterxml.jackson.annotation.JsonMerge;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -69,6 +74,8 @@ public class RunSync {
   private static final String HADOOP_CONFIG_PATH = "p";
   private static final String CONVERTERS_CONFIG_PATH = "c";
   private static final String ICEBERG_CATALOG_CONFIG_PATH = "i";
+  private static final String CONTINUOUS_MODE = "m";
+  private static final String CONTINUOUS_MODE_INTERVAL = "t";
   private static final String HELP_OPTION = "h";
 
   private static final Options OPTIONS =
@@ -96,6 +103,16 @@ public class RunSync {
               true,
               "The path to a yaml file containing Iceberg catalog 
configuration. The configuration will be "
                   + "used for any Iceberg source or target.")
+          .addOption(
+              CONTINUOUS_MODE,
+              "continuousMode",
+              false,
+              "Runs the tool on a scheduled loop. On each iteration, the 
process will reload the configurations from the provided file path allowing the 
user to update the tables managed by the job without restarting the job.")
+          .addOption(
+              CONTINUOUS_MODE_INTERVAL,
+              "continuousModeInterval",
+              true,
+              "The interval in seconds to schedule the loop. Requires 
--continuousMode to be set. Defaults to 5 seconds.")
           .addOption(HELP_OPTION, "help", false, "Displays help information to 
run this utility");
 
   public static void main(String[] args) throws IOException {
@@ -115,11 +132,40 @@ public class RunSync {
       return;
     }
 
-    DatasetConfig datasetConfig = new DatasetConfig();
+    if (cmd.hasOption(CONTINUOUS_MODE)) {
+      ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+      long intervalInSeconds = 
Long.parseLong(cmd.getOptionValue(CONTINUOUS_MODE_INTERVAL, "5"));
+      executorService.scheduleAtFixedRate(
+          () -> {
+            try {
+              runSync(cmd);
+            } catch (IOException ex) {
+              log.error("Sync operation failed", ex);
+            }
+          },
+          0,
+          intervalInSeconds,
+          TimeUnit.SECONDS);
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          log.debug("Received interrupt signal");
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+      executorService.shutdownNow();
+    } else {
+      runSync(cmd);
+    }
+  }
+
+  private static void runSync(CommandLine cmd) throws IOException {
+    DatasetConfig datasetConfig;
     try (InputStream inputStream =
         
Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) {
-      ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig);
-      objectReader.readValue(inputStream);
+      datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
     }
 
     byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
@@ -242,7 +288,9 @@ public class RunSync {
         : YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class);
   }
 
-  @Data
+  @Value
+  @Builder
+  @Jacksonized
   public static class DatasetConfig {
 
     /**
@@ -258,7 +306,9 @@ public class RunSync {
     /** Configuration of the dataset to sync, path, table name, etc. */
     List<Table> datasets;
 
-    @Data
+    @Value
+    @Builder
+    @Jacksonized
     public static class Table {
       /**
        * The base path of the table to sync. Any authentication configuration 
needed by HDFS client
diff --git 
a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml 
b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
index c80c939b..e9217a33 100644
--- a/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
+++ b/xtable-utilities/src/main/resources/xtable-conversion-defaults.yaml
@@ -29,6 +29,7 @@
 tableFormatConverters:
     HUDI:
       conversionSourceProviderClass: 
org.apache.xtable.hudi.HudiConversionSourceProvider
+      conversionTargetProviderClass: 
org.apache.xtable.hudi.HudiConversionTarget
     DELTA:
       conversionSourceProviderClass: 
org.apache.xtable.delta.DeltaConversionSourceProvider
       conversionTargetProviderClass: 
org.apache.xtable.delta.DeltaConversionTarget
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
new file mode 100644
index 00000000..2294e16a
--- /dev/null
+++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import lombok.SneakyThrows;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestJavaHudiTable;
+
+class ITRunSync {
+
+  @Test
+  void testSingleSyncMode(@TempDir Path tempDir) throws IOException {
+    String tableName = "test-table";
+    try (GenericTable table =
+        TestJavaHudiTable.forStandardSchema(
+            tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+      table.insertRows(20);
+      File configFile = writeConfigFile(tempDir, table, tableName);
+      String[] args = new String[] {"--datasetConfig", configFile.getPath()};
+      RunSync.main(args);
+      Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + 
"/metadata"));
+      waitForNumIcebergCommits(icebergMetadataPath, 2);
+    }
+  }
+
+  @Test
+  void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
+    ExecutorService runner = Executors.newSingleThreadExecutor();
+    String tableName = "test-table";
+    try (GenericTable table =
+        TestJavaHudiTable.forStandardSchema(
+            tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+      table.insertRows(20);
+      File configFile = writeConfigFile(tempDir, table, tableName);
+      String[] args = new String[] {"--datasetConfig", configFile.getPath(), 
"--continuousMode"};
+      runner.submit(
+          () -> {
+            try {
+              RunSync.main(args);
+            } catch (IOException ex) {
+              throw new UncheckedIOException(ex);
+            }
+          });
+      Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + 
"/metadata"));
+      waitForNumIcebergCommits(icebergMetadataPath, 2);
+      // write more data now that table is initialized and data is synced
+      table.insertRows(20);
+      waitForNumIcebergCommits(icebergMetadataPath, 3);
+      assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath));
+    } finally {
+      runner.shutdownNow();
+    }
+  }
+
+  private static File writeConfigFile(Path tempDir, GenericTable table, String 
tableName)
+      throws IOException {
+    RunSync.DatasetConfig config =
+        RunSync.DatasetConfig.builder()
+            .sourceFormat("HUDI")
+            .targetFormats(Collections.singletonList("ICEBERG"))
+            .datasets(
+                Collections.singletonList(
+                    RunSync.DatasetConfig.Table.builder()
+                        .tableBasePath(table.getBasePath())
+                        .tableName(tableName)
+                        .build()))
+            .build();
+    File configFile = new File(tempDir + "config.yaml");
+    RunSync.YAML_MAPPER.writeValue(configFile, config);
+    return configFile;
+  }
+
+  @SneakyThrows
+  private static void waitForNumIcebergCommits(Path metadataPath, int count) {
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(5)) {
+      if (numIcebergMetadataJsonFiles(metadataPath) == count) {
+        break;
+      }
+      Thread.sleep(5000);
+    }
+  }
+
+  @SneakyThrows
+  private static long numIcebergMetadataJsonFiles(Path path) {
+    long count = 0;
+    if (Files.exists(path)) {
+      count = Files.list(path).filter(p -> 
p.toString().endsWith("metadata.json")).count();
+    }
+    return count;
+  }
+}
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
index a61d948e..20a190e8 100644
--- 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunSync.java
@@ -88,12 +88,21 @@ class TestRunSync {
     Assertions.assertEquals(
         "org.apache.xtable.hudi.HudiConversionSourceProvider",
         tfConverters.get(HUDI).getConversionSourceProviderClass());
+    Assertions.assertEquals(
+        "org.apache.xtable.hudi.HudiConversionTarget",
+        tfConverters.get(HUDI).getConversionTargetProviderClass());
     Assertions.assertEquals(
         "org.apache.xtable.iceberg.IcebergConversionTarget",
         tfConverters.get(ICEBERG).getConversionTargetProviderClass());
     Assertions.assertEquals(
         "org.apache.xtable.iceberg.IcebergConversionSourceProvider",
         tfConverters.get(ICEBERG).getConversionSourceProviderClass());
+    Assertions.assertEquals(
+        "org.apache.xtable.delta.DeltaConversionTarget",
+        tfConverters.get(DELTA).getConversionTargetProviderClass());
+    Assertions.assertEquals(
+        "org.apache.xtable.delta.DeltaConversionSourceProvider",
+        tfConverters.get(DELTA).getConversionSourceProviderClass());
   }
 
   @Test

Reply via email to