This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.14.2-prep by this 
push:
     new 344a8b9da44a fix: Recreate tables during meta sync on certain 
exceptions (#18218)
344a8b9da44a is described below

commit 344a8b9da44a7889780f4bcf511d2b2e59c8f66f
Author: Lin Liu <[email protected]>
AuthorDate: Tue Mar 3 23:52:12 2026 -0800

    fix: Recreate tables during meta sync on certain exceptions (#18218)
    
    Adding support to recreate tables in hms based meta sync, when certain 
conditions are met.
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |  78 ++++++
 .../hudi/aws/sync/AwsGlueCatalogSyncTool.java      |   6 +
 .../hudi/config/GlueCatalogSyncClientConfig.java   |  27 +++
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  16 ++
 .../org/apache/hudi/common/fs/TestFSUtils.java     |  63 +++++
 .../hudi/common/testutils/SchemaTestUtil.java      |  27 +++
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |   8 +
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  95 ++++++--
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |  49 ++++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 262 ++++++++++++++++++---
 .../hudi/sync/common/HoodieMetaSyncOperations.java |  29 +++
 .../apache/hudi/sync/common/HoodieSyncClient.java  |   6 +
 .../docker_java17/docker_java17_test.sh            |  71 ++++--
 13 files changed, 664 insertions(+), 73 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 0e7609aba5cd..049d633c0257 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -43,6 +43,7 @@ import 
software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
 import software.amazon.awssdk.services.glue.model.CreateTableRequest;
 import software.amazon.awssdk.services.glue.model.CreateTableResponse;
 import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
 import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
 import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
@@ -366,6 +367,52 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  @Override
+  public void createOrReplaceTable(String tableName,
+                                   MessageType storageSchema,
+                                   String inputFormatClass,
+                                   String outputFormatClass,
+                                   String serdeClass,
+                                   Map<String, String> serdeProperties,
+                                   Map<String, String> tableProperties) {
+
+    if (!tableExists(tableName)) {
+      // if table doesn't exist before, directly create new table.
+      createTable(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+      return;
+    }
+
+    try {
+      // validate before dropping the table
+      validateSchemaAndProperties(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+      // drop and recreate the actual table
+      dropTable(tableName);
+      createTable(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Fail to recreate the table" + 
tableId(databaseName, tableName), e);
+    }
+  }
+
+  /**
+   * creates a temp table with the given schema and properties to ensure
+   * table creation succeeds before dropping the table and recreating it.
+   * This ensures that actual table is not dropped in case there are any
+   * issues with table creation because of provided schema or properties
+   */
+  private void validateSchemaAndProperties(String tableName,
+                                           MessageType storageSchema,
+                                           String inputFormatClass,
+                                           String outputFormatClass,
+                                           String serdeClass,
+                                           Map<String, String> serdeProperties,
+                                           Map<String, String> 
tableProperties) {
+    // Create a temp table to validate the schema and properties
+    String tempTableName = generateTempTableName(tableName);
+    createTable(tempTableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+    // drop the temp table
+    dropTable(tempTableName);
+  }
+
   @Override
   public void createTable(String tableName,
                           MessageType storageSchema,
@@ -539,6 +586,27 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  @Override
+  public void dropTable(String tableName) {
+    DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
+        .databaseName(databaseName)
+        .name(tableName)
+        .build();
+
+    try {
+      awsGlue.deleteTable(deleteTableRequest).get();
+      LOG.info("Successfully deleted table in AWS Glue: {}.{}", databaseName, 
tableName);
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        // In case {@code InterruptedException} was thrown, resetting the 
interrupted flag
+        // of the thread, we reset it (to true) again to permit subsequent 
handlers
+        // to be interrupted as well
+        Thread.currentThread().interrupt();
+      }
+      throw new HoodieGlueSyncException("Failed to delete table " + 
tableId(databaseName, tableName), e);
+    }
+  }
+
   @Override
   public Option<String> getLastReplicatedTime(String tableName) {
     throw new UnsupportedOperationException("Not supported: 
`getLastReplicatedTime`");
@@ -554,6 +622,16 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     throw new UnsupportedOperationException("Not supported: 
`deleteLastReplicatedTimeStamp`");
   }
 
+  @Override
+  public String getTableLocation(String tableName) {
+    try {
+      Table table = getTable(awsGlue, databaseName, tableName);
+      return table.storageDescriptor().location();
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Fail to get base path for the table " 
+ tableId(databaseName, tableName), e);
+    }
+  }
+
   private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
     List<Column> cols = new ArrayList<>();
     for (String key : mapSchema.keySet()) {
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index eed9486d69cd..a0efa70bc3bc 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 
 import java.util.Properties;
 
+import static 
org.apache.hudi.config.GlueCatalogSyncClientConfig.RECREATE_GLUE_TABLE_ON_ERROR;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 
 /**
@@ -52,6 +53,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
     syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
   }
 
+  @Override
+  protected boolean shouldRecreateAndSyncTable() {
+    return config.getBooleanOrDefault(RECREATE_GLUE_TABLE_ON_ERROR);
+  }
+
   public static void main(String[] args) {
     final HiveSyncConfig.HiveSyncConfigParams params = new 
HiveSyncConfig.HiveSyncConfigParams();
     JCommander cmd = JCommander.newBuilder().addObject(params).build();
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index efffae5bd893..c4b095d9cee5 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 /**
  * Hoodie Configs for Glue.
@@ -46,4 +49,28 @@ public class GlueCatalogSyncClientConfig extends 
HoodieConfig {
       .markAdvanced()
       .sinceVersion("0.14.0")
       .withDocumentation("Makes athena use the metadata table to list 
partitions and files. Currently it won't benefit from other features such stats 
indexes");
+
+  public static final ConfigProperty<Boolean> 
META_SYNC_PARTITION_INDEX_FIELDS_ENABLE = ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields.enable")
+      .defaultValue(false)
+      .sinceVersion("0.15.0")
+      .withDocumentation("Enable aws glue partition index feature, to speedup 
partition based query pattern");
+
+  public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS 
= ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
+      .noDefaultValue()
+      .withInferFunction(cfg -> 
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+          .or(() -> 
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
+      .sinceVersion("0.15.0")
+      .withDocumentation(String.join(" ", "Specify the partitions fields to 
index on aws glue. Separate the fields by semicolon.",
+          "By default, when the feature is enabled, all the partition will be 
indexed.",
+          "You can create up to three indexes, separate them by comma. Eg: 
col1;col2;col3,col2,col3"));
+
+  public static final ConfigProperty<Boolean> RECREATE_GLUE_TABLE_ON_ERROR = 
ConfigProperty
+      .key(GLUE_CLIENT_PROPERTY_PREFIX + "recreate_table_on_error")
+      .defaultValue(false)
+      .sinceVersion("0.14.2")
+      .markAdvanced()
+      .withDocumentation("Glue sync may fail if the Glue table exists with 
partitions differing from the Hoodie table or if schema evolution is not 
supported by Glue."
+          + "Enabling this configuration will drop and create the table to 
match the Hoodie config");
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index a3fc268b04c0..ac7e775017ac 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -914,6 +914,22 @@ public class FSUtils {
     return statuses;
   }
 
+  public static boolean comparePathsWithoutScheme(String pathStr1, String 
pathStr2) {
+    Path pathWithoutScheme1 = getPathWithoutScheme(new Path(pathStr1));
+    Path pathWithoutScheme2 = getPathWithoutScheme(new Path(pathStr2));
+    return pathWithoutScheme1.equals(pathWithoutScheme2);
+  }
+
+  public static Path getPathWithoutScheme(Path path) {
+    return path.isUriPathAbsolute()
+        ? new Path(null, path.toUri().getAuthority(), path.toUri().getPath()) 
: path;
+  }
+
+  // Converts s3a to s3a
+  public static String s3aToS3(String s3aUrl) {
+    return s3aUrl.replaceFirst("(?i)^s3a://", "s3://");
+  }
+
   /**
    * Serializable function interface.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 6018d496026f..7ad2bb3cb900 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -568,6 +568,69 @@ public class TestFSUtils extends HoodieCommonTestHarness {
         false));
   }
 
+  @Test
+  void testComparePathsWithoutScheme() {
+    String path1 = "s3://test_bucket_one/table/base/path";
+    String path2 = "s3a://test_bucket_two/table/base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since bucket names dont match");
+
+    path1 = "s3a://test_bucket_one/table/new_base/path";
+    path2 = "s3a://test_bucket_one/table/old_base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since paths don't match");
+
+    path1 = "s3://test_bucket_one/table/base/path";
+    path2 = "s3a://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
false since bucket names match without file shema");
+
+    path1 = "s3a://test_bucket_one/table/base/path";
+    path2 = "s3a://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since bucket names and path matches");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    path2 = "gs://test_bucket_two/table/base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return true since bucket names and path matches");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    path2 = "gs://test_bucket_one/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since bucket names and path matches");
+
+    path1 = "file:/var/table/base/path";
+    path2 = "/var/table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since path matches");
+
+    path1 = "file:/var/table/base/path";
+    path2 = "file:/var/table/old_base/path";
+    assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should 
return false since path doesn't matches");
+
+    path1 = "table/base/path";
+    path2 = "table/base/path";
+    assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return 
true since relative path doesn't matches");
+  }
+
+  @Test
+  void testS3aToS3_AWS() {
+    // Test cases for AWS S3 URLs
+    assertEquals("s3://my-bucket/path/to/object", 
FSUtils.s3aToS3("s3a://my-bucket/path/to/object"));
+    assertEquals("s3://my-bucket", FSUtils.s3aToS3("s3a://my-bucket"));
+    assertEquals("s3://MY-BUCKET/PATH/TO/OBJECT", 
FSUtils.s3aToS3("s3a://MY-BUCKET/PATH/TO/OBJECT"));
+    assertEquals("s3://my-bucket/path/to/object", 
FSUtils.s3aToS3("S3a://my-bucket/path/to/object"));
+    assertEquals("s3://my-bucket/path/to/object", 
FSUtils.s3aToS3("s3A://my-bucket/path/to/object"));
+    assertEquals("s3://my-bucket/path/to/object", 
FSUtils.s3aToS3("S3A://my-bucket/path/to/object"));
+    assertEquals("s3://my-bucket/s3a://another-bucket/another/path", 
FSUtils.s3aToS3("s3a://my-bucket/s3a://another-bucket/another/path"));
+  }
+
+  @Test
+  void testGetPathWithoutScheme() {
+    String path1 = "s3://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+
+    path1 = "s3a://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+
+    path1 = "gs://test_bucket_one/table/base/path";
+    assertEquals(FSUtils.getPathWithoutScheme(new 
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should 
return false since bucket names dont match");
+  }
+
   private Path getHoodieTempDir() {
     return new Path(baseUri.toString(), ".hoodie/.temp");
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 8f3cbe5b19f2..ead7b41e17ab 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -112,6 +112,24 @@ public final class SchemaTestUtil {
     }
   }
 
+  private static <T extends IndexedRecord> List<T> toRecords(Schema 
writerSchema, Schema readerSchema, String path)
+      throws IOException, URISyntaxException {
+    GenericDatumReader<T> reader = new GenericDatumReader<>(writerSchema, 
readerSchema);
+    Path dataPath = initializeSampleDataPath(path);
+
+    try (Stream<String> stream = Files.lines(dataPath)) {
+      return stream.map(s -> {
+        try {
+          return reader.read(null, 
DecoderFactory.get().jsonDecoder(writerSchema, s));
+        } catch (IOException e) {
+          throw new HoodieIOException("Could not read data from " + path, e);
+        }
+      }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not read data from " + path, e);
+    }
+  }
+
   /**
    * Required to register the necessary JAR:// file system.
    * @return Path to the sample data in the resource file.
@@ -127,6 +145,15 @@ public final class SchemaTestUtil {
     }
   }
 
+  private static Path initializeSampleDataPath(String path) throws 
IOException, URISyntaxException {
+    URI resource = SchemaTestUtil.class.getResource(path).toURI();
+    if (resource.toString().contains("!")) {
+      return uriToPath(resource);
+    } else {
+      return Paths.get(SchemaTestUtil.class.getResource(path).toURI());
+    }
+  }
+
   public static Path uriToPath(URI uri) throws IOException {
     final Map<String, String> env = new HashMap<>();
     final String[] array = uri.toString().split("!");
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 73f25b1615fc..6b9b2ccd958f 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -87,6 +87,14 @@ public class HiveSyncConfig extends HoodieSyncConfig {
       .withDocumentation("Max size limit to push down partition filters, if 
the estimate push down "
           + "filters exceed this size, will directly try to fetch all 
partitions");
 
+  public static final ConfigProperty<Boolean> RECREATE_HIVE_TABLE_ON_ERROR = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.recreate_table_on_error")
+      .defaultValue(false)
+      .sinceVersion("0.14.2")
+      .markAdvanced()
+      .withDocumentation("Hive sync may fail if the Hive table exists with 
partitions differing from the Hoodie table or if schema evolution if not 
supported by Hive."
+          + "Enabling this configuration will drop and create the table to 
match the Hoodie config");
+
   public static String getBucketSpec(String bucketCols, int bucketNum) {
     return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 9d44bbdc07ef..7d9b2afa33ce 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -50,6 +51,7 @@ import static 
org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
 import static 
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -90,7 +92,7 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
   public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
 
-  private HiveSyncConfig config;
+  protected HiveSyncConfig config;
   private final String databaseName;
   private final String tableName;
 
@@ -181,11 +183,11 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
         break;
       case MERGE_ON_READ:
         switch (HoodieSyncTableStrategy.valueOf(hiveSyncTableStrategy)) {
-          case RO :
+          case RO:
             // sync a RO table for MOR
             syncHoodieTable(tableName, false, true);
             break;
-          case RT :
+          case RT:
             // sync a RT table for MOR
             syncHoodieTable(tableName, true, false);
             break;
@@ -224,6 +226,48 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     LOG.info("Trying to sync hoodie table " + tableName + " with base path " + 
syncClient.getBasePath()
         + " of type " + syncClient.getTableType());
 
+    // create database if needed
+    checkAndCreateDatabase();
+
+    final boolean tableExists = syncClient.tableExists(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+    // if table exists and location of the metastore table doesn't match the 
hoodie base path, recreate the table
+    if (tableExists && 
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(), 
syncClient.getTableLocation(tableName))) {
+      LOG.info("basepath is updated for the table {}", tableName);
+      recreateAndSyncHiveTable(tableName, useRealtimeInputFormat, 
readAsOptimized);
+      return;
+    }
+
+    boolean schemaChanged;
+    boolean propertiesChanged;
+    try {
+      if (tableExists) {
+        schemaChanged = syncSchema(tableName, schema);
+        propertiesChanged = syncProperties(tableName, useRealtimeInputFormat, 
readAsOptimized, schema);
+      } else {
+        syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized, 
schema);
+        schemaChanged = true;
+        propertiesChanged = true;
+      }
+
+      boolean partitionsChanged = validateAndSyncPartitions(tableName, 
tableExists);
+      boolean meetSyncConditions = schemaChanged || propertiesChanged || 
partitionsChanged;
+      if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || 
meetSyncConditions) {
+        syncClient.updateLastCommitTimeSynced(tableName);
+      }
+      LOG.info("Sync complete for {}", tableName);
+    } catch (HoodieHiveSyncException ex) {
+      if (shouldRecreateAndSyncTable()) {
+        LOG.warn("failed to sync the table {}, trying to recreate", tableName, 
ex);
+        recreateAndSyncHiveTable(tableName, useRealtimeInputFormat, 
readAsOptimized);
+      } else {
+        throw new HoodieHiveSyncException("failed to sync the table " + 
tableName, ex);
+      }
+    }
+  }
+
+  private void checkAndCreateDatabase() {
     // check if the database exists else create it
     if (config.getBoolean(HIVE_AUTO_CREATE_DATABASE)) {
       try {
@@ -240,22 +284,9 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
         throw new HoodieHiveSyncException("hive database does not exist " + 
databaseName);
       }
     }
+  }
 
-    final boolean tableExists = syncClient.tableExists(tableName);
-
-    // Get the parquet schema for this table looking at the latest commit
-    MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
-    boolean schemaChanged;
-    boolean propertiesChanged;
-    if (tableExists) {
-      schemaChanged = syncSchema(tableName, schema);
-      propertiesChanged = syncProperties(tableName, useRealtimeInputFormat, 
readAsOptimized, schema);
-    } else {
-      syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized, 
schema);
-      schemaChanged = true;
-      propertiesChanged = true;
-    }
-
+  private boolean validateAndSyncPartitions(String tableName, boolean 
tableExists) {
     boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
     Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
         ? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
@@ -290,12 +321,34 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
       Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced, 
lastCommitCompletionTimeSynced);
       partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, 
droppedPartitions);
     }
+    return partitionsChanged;
+  }
+
+  protected boolean shouldRecreateAndSyncTable() {
+    return config.getBooleanOrDefault(RECREATE_HIVE_TABLE_ON_ERROR);
+  }
 
-    boolean meetSyncConditions = schemaChanged || propertiesChanged || 
partitionsChanged;
-    if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
+  private void recreateAndSyncHiveTable(String tableName, boolean 
useRealtimeInputFormat, boolean readAsOptimized) {
+    LOG.info("recreating and syncing the table {}", tableName);
+    MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+    try {
+      createOrReplaceTable(tableName, useRealtimeInputFormat, readAsOptimized, 
schema);
+      syncAllPartitions(tableName);
       syncClient.updateLastCommitTimeSynced(tableName);
+    } catch (HoodieHiveSyncException ex) {
+      throw new HoodieHiveSyncException("failed to recreate the table for " + 
tableName, ex);
     }
-    LOG.info("Sync complete for " + tableName);
+  }
+
+  private void createOrReplaceTable(String tableName, boolean 
useRealtimeInputFormat, boolean readAsOptimized, MessageType schema) {
+    HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+    String inputFormatClassName = getInputFormatClassName(baseFileFormat, 
useRealtimeInputFormat, 
config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT));
+    String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+    String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+    Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized);
+    Map<String, String> tableProperties = getTableProperties(schema);
+    syncClient.createOrReplaceTable(tableName, schema, inputFormatClassName,
+        outputFormatClassName, serDeFormatClassName, serdeProperties, 
tableProperties);
   }
 
   private Map<String, String> getTableProperties(MessageType schema) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 32ad873a83d3..1fe00d1c53c1 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -228,6 +228,35 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
     }
   }
 
+  public void createOrReplaceTable(String tableName,
+                                   MessageType storageSchema,
+                                   String inputFormatClass,
+                                   String outputFormatClass,
+                                   String serdeClass,
+                                   Map<String, String> serdeProperties,
+                                   Map<String, String> tableProperties) {
+
+    if (!tableExists(tableName)) {
+      createTable(tableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+      return;
+    }
+    try {
+      // create temp table
+      String tempTableName = generateTempTableName(tableName);
+      createTable(tempTableName, storageSchema, inputFormatClass, 
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+
+      // if create table is successful, drop the actual table
+      // and rename temp table to actual table
+      dropTable(tableName);
+
+      Table table = client.getTable(databaseName, tempTableName);
+      table.setTableName(tableName);
+      client.alter_table(databaseName, tempTableName, table);
+    } catch (Exception ex) {
+      throw new HoodieHiveSyncException("failed to create table " + 
tableId(databaseName, tableName), ex);
+    }
+  }
+
   @Override
   public void createTable(String tableName, MessageType storageSchema, String 
inputFormatClass,
                           String outputFormatClass, String serdeClass,
@@ -429,4 +458,24 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
   StorageDescriptor getMetastoreStorageDescriptor(String tableName) throws 
TException {
     return client.getTable(databaseName, tableName).getSd();
   }
+
+  @Override
+  public void dropTable(String tableName) {
+    try {
+      client.dropTable(databaseName, tableName);
+      LOG.info("Successfully deleted table in Hive: {}.{}", databaseName, 
tableName);
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to delete the table " + 
tableId(databaseName, tableName), e);
+    }
+  }
+
+  @Override
+  public String getTableLocation(String tableName) {
+    try {
+      Table table = client.getTable(databaseName, tableName);
+      return table.getSd().getLocation();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the basepath of the 
table " + tableId(databaseName, tableName), e);
+    }
+  }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 29bb274b015a..5bd4838a62f5 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -85,6 +86,7 @@ import static 
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -243,7 +245,7 @@ public class TestHiveSyncTool {
       throw new HoodieHiveSyncException("Failed to get the metastore location 
from the table " + tableName, e);
     }
   }
-  
+
   @ParameterizedTest
   @MethodSource("syncMode")
   public void testSyncAllPartition() throws Exception {
@@ -286,7 +288,7 @@ public class TestHiveSyncTool {
         "Table partitions should match the number of partitions we wrote");
     // Drop partition with HMSDDLExecutor
     try (HMSDDLExecutor hmsExecutor =
-            new HMSDDLExecutor(hiveSyncConfig, 
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
+             new HMSDDLExecutor(hiveSyncConfig, 
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
       hmsExecutor.dropPartitionsToTable(HiveTestUtil.TABLE_NAME, 
Collections.singletonList("2010/02/03"));
     }
 
@@ -1031,34 +1033,34 @@ public class TestHiveSyncTool {
     String roTableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
     reInitHiveSyncClient();
     assertFalse(hiveClient.tableExists(roTableName),
-            "Table " + roTableName + " should not exist initially");
+        "Table " + roTableName + " should not exist initially");
     assertFalse(hiveClient.tableExists(snapshotTableName),
-            "Table " + snapshotTableName + " should not exist initially");
+        "Table " + snapshotTableName + " should not exist initially");
     reSyncHiveTable();
     switch (strategy) {
       case RO:
         assertFalse(hiveClient.tableExists(snapshotTableName),
-                "Table " + snapshotTableName
-                        + " should not exist initially");
+            "Table " + snapshotTableName
+                + " should not exist initially");
         assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
-                "Table " + HiveTestUtil.TABLE_NAME
-                        + " should exist after sync completes");
+            "Table " + HiveTestUtil.TABLE_NAME
+                + " should exist after sync completes");
         break;
       case RT:
         assertFalse(hiveClient.tableExists(roTableName),
-                "Table " + roTableName
-                        + " should not exist initially");
+            "Table " + roTableName
+                + " should not exist initially");
         assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
-                "Table " + HiveTestUtil.TABLE_NAME
-                        + " should exist after sync completes");
+            "Table " + HiveTestUtil.TABLE_NAME
+                + " should exist after sync completes");
         break;
       default:
         assertTrue(hiveClient.tableExists(roTableName),
-                "Table " + roTableName
-                        + " should exist after sync completes");
+            "Table " + roTableName
+                + " should exist after sync completes");
         assertTrue(hiveClient.tableExists(snapshotTableName),
-                "Table " + snapshotTableName
-                        + " should exist after sync completes");
+            "Table " + snapshotTableName
+                + " should exist after sync completes");
     }
   }
 
@@ -1075,11 +1077,11 @@ public class TestHiveSyncTool {
     MessageType schema = hiveClient.getStorageSchema(true);
 
     assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
-            "Table " + HiveTestUtil.TABLE_NAME + " should not exist 
initially");
+        "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
 
     String initInputFormatClassName = 
strategy.equals(HoodieSyncTableStrategy.RO)
-            ? HoodieParquetRealtimeInputFormat.class.getName()
-            : HoodieParquetInputFormat.class.getName();
+        ? HoodieParquetRealtimeInputFormat.class.getName()
+        : HoodieParquetInputFormat.class.getName();
 
     String outputFormatClassName = 
HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET);
     String serDeFormatClassName = 
HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET);
@@ -1087,28 +1089,28 @@ public class TestHiveSyncTool {
     // Create table 'test1'.
     hiveClient.createDatabase(HiveTestUtil.DB_NAME);
     hiveClient.createTable(HiveTestUtil.TABLE_NAME, schema, 
initInputFormatClassName,
-            outputFormatClassName, serDeFormatClassName, new HashMap<>(), new 
HashMap<>());
+        outputFormatClassName, serDeFormatClassName, new HashMap<>(), new 
HashMap<>());
     assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
-            "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
+        "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
 
     String targetInputFormatClassName = 
strategy.equals(HoodieSyncTableStrategy.RO)
-            ? HoodieParquetInputFormat.class.getName()
-            : HoodieParquetRealtimeInputFormat.class.getName();
+        ? HoodieParquetInputFormat.class.getName()
+        : HoodieParquetRealtimeInputFormat.class.getName();
 
     StorageDescriptor storageDescriptor = 
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
     assertEquals(initInputFormatClassName, storageDescriptor.getInputFormat(),
-            "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " + 
targetInputFormatClassName);
+        "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " + 
targetInputFormatClassName);
     
assertFalse(storageDescriptor.getSerdeInfo().getParameters().containsKey(ConfigUtils.IS_QUERY_AS_RO_TABLE),
-            "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " + 
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
+        "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " + 
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
 
     reSyncHiveTable();
     storageDescriptor = 
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
     assertEquals(targetInputFormatClassName,
-            storageDescriptor.getInputFormat(),
-            "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " + 
targetInputFormatClassName);
+        storageDescriptor.getInputFormat(),
+        "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " + 
targetInputFormatClassName);
     
assertEquals(storageDescriptor.getSerdeInfo().getParameters().get(ConfigUtils.IS_QUERY_AS_RO_TABLE),
-            strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
-            "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " + 
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
+        strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
+        "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " + 
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
 
   }
 
@@ -1651,4 +1653,206 @@ public class TestHiveSyncTool {
   private int getPartitionFieldSize() {
     return 
hiveSyncProps.getString(META_SYNC_PARTITION_FIELDS.key()).split(",").length;
   }
+
+  @ParameterizedTest
+  @MethodSource("syncMode")
+  public void testSyncHoodieTableCatchRuntimeException(String syncMode) throws 
Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+    reInitHiveSyncClient();
+    assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+        "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+    // Create a tool instance to initialize the sync client
+    HiveSyncTool tool = new HiveSyncTool(hiveSyncProps, getHiveConf());
+    try {
+      // Corrupt the table by deleting the commit file to cause a 
RuntimeException during sync
+      // This will cause doSync() to fail when it tries to read the storage 
schema
+      Path metadataPath = new Path(HiveTestUtil.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME);
+      Path instantPath = new Path(metadataPath, commitTime + ".commit");
+      if (HiveTestUtil.fileSystem.exists(instantPath)) {
+        HiveTestUtil.fileSystem.delete(instantPath, false);
+      }
+
+      // Verify that syncHoodieTable catches RuntimeException and wraps it in 
HoodieException
+      HoodieException exception = assertThrows(HoodieException.class, () -> {
+        tool.syncHoodieTable();
+      }, "syncHoodieTable should throw HoodieException when RuntimeException 
occurs");
+
+      // Verify the exception message contains the table name
+      assertTrue(exception.getMessage().contains(HiveTestUtil.TABLE_NAME),
+          "Exception message should contain table name: " + 
HiveTestUtil.TABLE_NAME);
+      assertTrue(exception.getMessage().contains("Got runtime exception when 
hive syncing"),
+          "Exception message should contain expected error message");
+
+      // Verify that the cause is a RuntimeException
+      assertTrue(exception.getCause() instanceof RuntimeException,
+          "The cause should be a RuntimeException");
+    } finally {
+      // Ensure tool is closed even if exception is thrown
+      tool.close();
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncMode")
+  public void testRecreateAndSyncHiveTableOnError(String syncMode) throws 
Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(RECREATE_HIVE_TABLE_ON_ERROR.key(), "true");
+
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+    reInitHiveSyncClient();
+    assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+        "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+    // First sync should succeed
+    reSyncHiveTable();
+    assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+        "Table " + HiveTestUtil.TABLE_NAME + " should exist after first sync");
+    assertEquals(commitTime, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+        "The last commit that was synced should be updated in the 
TBLPROPERTIES");
+
+    // Verify the Hoodie table structure is intact
+    Path hoodieMetaPath = new Path(HiveTestUtil.basePath, 
HoodieTableMetaClient.METAFOLDER_NAME);
+    assertTrue(HiveTestUtil.fileSystem.exists(hoodieMetaPath),
+        "Hoodie metadata folder should exist at " + hoodieMetaPath);
+
+    // Corrupt the Hive table by dropping it and creating it with incompatible 
serde/input format
+    // This will cause syncProperties to potentially fail when trying to 
update to Parquet formats
+    ddlExecutor.runSQL("DROP TABLE IF EXISTS `" + HiveTestUtil.TABLE_NAME + 
"`");
+
+    // Create table with correct schema but wrong input/output format
+    // The location must point to the correct basePath to keep Hoodie metadata 
accessible
+    String createTableSql = String.format(
+        "CREATE TABLE `%s` (`name` STRING, `favorite_number` INT, 
`favorite_color` STRING) "
+        + "PARTITIONED BY (`datestr` STRING) "
+        + "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' "
+        + "OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' "
+        + "LOCATION '%s'",
+        HiveTestUtil.TABLE_NAME, HiveTestUtil.basePath);
+    ddlExecutor.runSQL(createTableSql);
+
+    // Now sync should attempt to update the serde properties to use Parquet 
formats
+    // If this fails with HoodieHiveSyncException, recreateAndSyncHiveTable 
should be called
+    // which will drop and recreate the table with correct formats
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    // Verify the table was recreated and synced successfully
+    assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+        "Table " + HiveTestUtil.TABLE_NAME + " should exist after recreation");
+
+    // Verify the schema is correct
+    Map<String, String> schema = 
hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME);
+    assertTrue(schema.containsKey("name"), "Schema should contain 'name' 
field");
+    assertEquals("string", schema.get("name").toLowerCase(),
+        "Schema should have correct type for 'name' field after recreation");
+    // Verify the last commit time is synced, indicating successful recreation 
and sync
+    assertEquals(commitTime, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+        "The last commit that was synced should be updated after recreation");
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncModeAndEnablePushDown")
+  public void testRecreateCOWTableOnBasePathChange(String syncMode, String 
enablePushDown) throws Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    String commitTime1 = "100";
+    HiveTestUtil.createCOWTable(commitTime1, 5, true);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    String commitTime2 = "105";
+    // let's update the basepath
+    basePath = Files.createTempDirectory("hivesynctest_new" + 
Instant.now().toEpochMilli()).toUri().toString();
+    hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+    // let's create new table in new basepath
+    HiveTestUtil.createCOWTable(commitTime2, 2, true);
+    // Now lets create more partitions and these are the only ones which needs 
to be synced
+    ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+    String commitTime3 = "110";
+    // let's add 2 more partitions to the new basepath
+    HiveTestUtil.addCOWPartitions(2, false, true, dateTime, commitTime3);
+
+    // reinitialize hive client
+    reInitHiveSyncClient();
+    // after reinitializing hive client, table location shouldn't match hoodie 
base path
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location 
should match hoodie basepath");
+
+    // Lets do the sync
+    reSyncHiveTable();
+    // verify partition count should be 4 from new basepath, not 5 from old
+    assertEquals(4, 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
+        "the 4 partitions from new base path should be present for hive");
+    // verify last commit time synced
+    assertEquals(commitTime3, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+        "The last commit that was synced should be 110");
+    // table location now should be updated to latest hoodie basepath
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location 
should match hoodie basepath");
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncModeAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnReadWithBasePathChange(boolean 
useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws 
Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    String instantTime = "100";
+    String deltaCommitTime = "101";
+    HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+        useSchemaFromCommitMetadata);
+
+    String roTableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+    String rtTableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+    reInitHiveSyncClient();
+    assertFalse(hiveClient.tableExists(roTableName), "Table " + 
HiveTestUtil.TABLE_NAME + " should not exist initially");
+    assertFalse(hiveClient.tableExists(rtTableName), "Table " + 
HiveTestUtil.TABLE_NAME + " should not exist initially");
+    // Lets do the sync
+    reSyncHiveTable();
+
+    // change the hoodie base path
+    basePath = Files.createTempDirectory("hivesynctest_new" + 
Instant.now().toEpochMilli()).toUri().toString();
+    hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+    String instantTime2 = "102";
+    String deltaCommitTime2 = "103";
+    // let's create MOR table in the new basepath
+    HiveTestUtil.createMORTable(instantTime2, deltaCommitTime2, 2, true,
+        useSchemaFromCommitMetadata);
+
+    // let's add more partitions in the new basepath
+    ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+    String commitTime3 = "104";
+    String deltaCommitTime3 = "105";
+    HiveTestUtil.addMORPartitions(2, true, false,
+        useSchemaFromCommitMetadata, dateTime, commitTime3, deltaCommitTime3);
+
+    // reinitialize hive client
+    reInitHiveSyncClient();
+    // verify table location is different from hoodie basepath
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(roTableName), "ro table location should not match 
hoodie base path before sync");
+    assertNotEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(rtTableName), "rt table location should not match 
hoodie base path before sync");
+    // Lets do the sync
+    reSyncHiveTable();
+
+    // verify partition count should be 4, not 5 from old basepath
+    assertEquals(4, hiveClient.getAllPartitions(roTableName).size(),
+        "the 4 partitions from new base path should be present for ro table");
+    assertEquals(4, hiveClient.getAllPartitions(rtTableName).size(),
+        "the 4 partitions from new base path should be present for rt table");
+    // verify last synced commit time
+    assertEquals(deltaCommitTime3, 
hiveClient.getLastCommitTimeSynced(roTableName).get(),
+        "The last commit that was synced should be 103");
+    assertEquals(deltaCommitTime3, 
hiveClient.getLastCommitTimeSynced(rtTableName).get(),
+        "The last commit that was synced should be 103");
+    // verify table location is updated to the new hoodie basepath
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(roTableName), "ro table location should match 
hoodie base path after sync");
+    assertEquals(hiveClient.getBasePath(), 
hiveClient.getTableLocation(rtTableName), "rt table location should match 
hoodie base path after sync");
+  }
 }
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index 87af1d16d75c..c1d1f5bf22c2 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.sync.common;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
 
@@ -56,6 +57,27 @@ public interface HoodieMetaSyncOperations {
 
   }
 
+  /**
+   * Create or replace the table.
+   *
+   * @param tableName         The table name.
+   * @param storageSchema     The table schema.
+   * @param inputFormatClass  The input format class of this table.
+   * @param outputFormatClass The output format class of this table.
+   * @param serdeClass        The serde class of this table.
+   * @param serdeProperties   The serde properties of this table.
+   * @param tableProperties   The table properties for this table.
+   */
+  default void createOrReplaceTable(String tableName,
+                                    MessageType storageSchema,
+                                    String inputFormatClass,
+                                    String outputFormatClass,
+                                    String serdeClass,
+                                    Map<String, String> serdeProperties,
+                                    Map<String, String> tableProperties) {
+
+  }
+
   /**
    * Check if table exists in metastore.
    */
@@ -165,6 +187,13 @@ public interface HoodieMetaSyncOperations {
     return Collections.emptyList();
   }
 
+  /**
+   * Get the base path of the table from metastore
+   */
+  default String getTableLocation(String tableName) {
+    return StringUtils.EMPTY_STRING;
+  }
+
   /**
    * Update the field comments for table in metastore, by using the ones from 
storage.
    *
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 4c5fb01b9e75..6ec35c435a05 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -56,6 +57,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   protected final HoodieSyncConfig config;
   protected final PartitionValueExtractor partitionValueExtractor;
   protected final HoodieTableMetaClient metaClient;
+  private static final String TEMP_SUFFIX = "_temp";
 
   public HoodieSyncClient(HoodieSyncConfig config) {
     this.config = config;
@@ -244,4 +246,8 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     }
     return paths;
   }
+
+  protected String generateTempTableName(String tableName) {
+    return tableName + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond();
+  }
 }
diff --git a/packaging/bundle-validation/docker_java17/docker_java17_test.sh 
b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
index 7fcc9e5000e3..ce32ca4fa4ac 100755
--- a/packaging/bundle-validation/docker_java17/docker_java17_test.sh
+++ b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
@@ -51,28 +51,18 @@ start_datanode () {
 
   echo "::warning::docker_test_java17.sh starting datanode:"$DN
 
-  cat $HADOOP_HOME/hadoop/etc/hdfs-site.xml
-  cat $HADOOP_HOME/hadoop/etc/core-site.xml
-
   DN_DIR_PREFIX=$DOCKER_TEST_DIR/additional_datanode/
   PID_DIR=$DOCKER_TEST_DIR/pid/$1
 
-  if [ -z $DN_DIR_PREFIX ]; then
-    mkdir -p $DN_DIR_PREFIX
-  fi
-
-  if [ -z $PID_DIR ]; then
-    mkdir -p $PID_DIR
-  fi
-
-  export HADOOP_PID_DIR=$PID_PREFIX
+  mkdir -p $DN_DIR_PREFIX $PID_DIR
+  export HADOOP_PID_DIR=$PID_DIR
   DN_CONF_OPTS="\
   -Dhadoop.tmp.dir=$DN_DIR_PREFIX$DN\
   -Ddfs.datanode.address=localhost:5001$DN \
   -Ddfs.datanode.http.address=localhost:5008$DN \
   -Ddfs.datanode.ipc.address=localhost:5002$DN"
-  $HADOOP_HOME/bin/hdfs --daemon start datanode $DN_CONF_OPTS
-  $HADOOP_HOME/bin/hdfs dfsadmin -report
+  bash $HADOOP_HOME/bin/hdfs --daemon start datanode $DN_CONF_OPTS
+  bash $HADOOP_HOME/bin/hdfs dfsadmin -report
 }
 
 setup_hdfs () {
@@ -80,20 +70,41 @@ setup_hdfs () {
   mv /opt/bundle-validation/tmp-conf-dir/hdfs-site.xml 
$HADOOP_HOME/etc/hadoop/hdfs-site.xml
   mv /opt/bundle-validation/tmp-conf-dir/core-site.xml 
$HADOOP_HOME/etc/hadoop/core-site.xml
 
-  $HADOOP_HOME/bin/hdfs namenode -format
-  $HADOOP_HOME/bin/hdfs --daemon start namenode
-  echo "::warning::docker_test_java17.sh starting hadoop hdfs"
-  $HADOOP_HOME/sbin/start-dfs.sh
+  mkdir -p $DOCKER_TEST_DIR/pid
+  export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid
 
-  # start datanodes
+  bash $HADOOP_HOME/bin/hdfs namenode -format
+  bash $HADOOP_HOME/bin/hdfs --daemon start namenode
+
+  echo "::warning::docker_test_java17.sh waiting for NameNode to start"
+  NAMENODE_READY=0
+  for i in $(seq 1 30); do
+    if bash $HADOOP_HOME/bin/hdfs dfsadmin -report >/dev/null 2>&1; then
+      NAMENODE_READY=1
+      break
+    fi
+    sleep 2
+  done
+  if [ "$NAMENODE_READY" -ne 1 ]; then
+    echo "::error::docker_test_java17.sh NameNode failed to start"
+    exit 1
+  fi
+
+  echo "::warning::docker_test_java17.sh starting default datanode"
+  bash $HADOOP_HOME/bin/hdfs --daemon start datanode
+
+  echo "::warning::docker_test_java17.sh waiting for DataNode to register"
+  sleep 10
+
+  # start additional datanodes
   for i in $(seq 1 3)
   do
     start_datanode $i
   done
 
   echo "::warning::docker_test_java17.sh starting hadoop hdfs, hdfs report"
-  $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/root
-  $HADOOP_HOME/bin/hdfs dfs -ls /user/
+  bash $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/root
+  bash $HADOOP_HOME/bin/hdfs dfs -ls /user/
   if [ "$?" -ne 0 ]; then
     echo "::error::docker_test_java17.sh Failed setting up HDFS!"
     exit 1
@@ -103,7 +114,14 @@ setup_hdfs () {
 stop_hdfs() {
   use_default_java_runtime
   echo "::warning::docker_test_java17.sh stopping hadoop hdfs"
-  $HADOOP_HOME/sbin/stop-dfs.sh
+  export HADOOP_PID_DIR=${HADOOP_PID_DIR:-$DOCKER_TEST_DIR/pid}
+  bash $HADOOP_HOME/bin/hdfs --daemon stop datanode 2>/dev/null || true
+  for i in 1 2 3; do
+    export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid/$i
+    bash $HADOOP_HOME/bin/hdfs --daemon stop datanode 2>/dev/null || true
+  done
+  export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid
+  bash $HADOOP_HOME/bin/hdfs --daemon stop namenode 2>/dev/null || true
 }
 
 build_hudi_java8 () {
@@ -123,7 +141,14 @@ build_hudi_java8 () {
     mkdir -p $JARS_DIR
   fi
 
-  cp ./packaging/hudi-spark-bundle/target/hudi-spark*.jar $JARS_DIR/spark.jar
+  echo "::warning::docker_test_java17.sh copy hudi-spark bundle jar to target 
folder"
+  cp ./packaging/hudi-spark-bundle/target/hudi-spark*SNAPSHOT.jar 
$JARS_DIR/spark.jar
+  echo "::warning::docker_test_java17.sh copy hudi-spark bundle jar to target 
folder DONE"
+
+  if [ "$?" -ne 0 ]; then
+    echo "::error::docker_test_java17.sh Failed to copy hudi-spark bundle jar 
to target folder"
+    exit 1
+  fi
 }
 
 run_docker_tests() {

Reply via email to