This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.14.2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.14.2-prep by this
push:
new 344a8b9da44a fix: Recreate tables during meta sync on certain
exceptions (#18218)
344a8b9da44a is described below
commit 344a8b9da44a7889780f4bcf511d2b2e59c8f66f
Author: Lin Liu <[email protected]>
AuthorDate: Tue Mar 3 23:52:12 2026 -0800
fix: Recreate tables during meta sync on certain exceptions (#18218)
Adding support to recreate tables in hms based meta sync, when certain
conditions are met.
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 78 ++++++
.../hudi/aws/sync/AwsGlueCatalogSyncTool.java | 6 +
.../hudi/config/GlueCatalogSyncClientConfig.java | 27 +++
.../java/org/apache/hudi/common/fs/FSUtils.java | 16 ++
.../org/apache/hudi/common/fs/TestFSUtils.java | 63 +++++
.../hudi/common/testutils/SchemaTestUtil.java | 27 +++
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 8 +
.../java/org/apache/hudi/hive/HiveSyncTool.java | 95 ++++++--
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 49 ++++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 262 ++++++++++++++++++---
.../hudi/sync/common/HoodieMetaSyncOperations.java | 29 +++
.../apache/hudi/sync/common/HoodieSyncClient.java | 6 +
.../docker_java17/docker_java17_test.sh | 71 ++++--
13 files changed, 664 insertions(+), 73 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 0e7609aba5cd..049d633c0257 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -43,6 +43,7 @@ import
software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.CreateTableResponse;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
@@ -366,6 +367,52 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ @Override
+ public void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ if (!tableExists(tableName)) {
+ // if table doesn't exist before, directly create new table.
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ return;
+ }
+
+ try {
+ // validate before dropping the table
+ validateSchemaAndProperties(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ // drop and recreate the actual table
+ dropTable(tableName);
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to recreate the table" +
tableId(databaseName, tableName), e);
+ }
+ }
+
+ /**
+ * creates a temp table with the given schema and properties to ensure
+ * table creation succeeds before dropping the table and recreating it.
+ * This ensures that actual table is not dropped in case there are any
+ * issues with table creation because of provided schema or properties
+ */
+ private void validateSchemaAndProperties(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String>
tableProperties) {
+ // Create a temp table to validate the schema and properties
+ String tempTableName = generateTempTableName(tableName);
+ createTable(tempTableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ // drop the temp table
+ dropTable(tempTableName);
+ }
+
@Override
public void createTable(String tableName,
MessageType storageSchema,
@@ -539,6 +586,27 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ @Override
+ public void dropTable(String tableName) {
+ DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
+ .databaseName(databaseName)
+ .name(tableName)
+ .build();
+
+ try {
+ awsGlue.deleteTable(deleteTableRequest).get();
+ LOG.info("Successfully deleted table in AWS Glue: {}.{}", databaseName,
tableName);
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ // In case {@code InterruptedException} was thrown, resetting the
interrupted flag
+ // of the thread, we reset it (to true) again to permit subsequent
handlers
+ // to be interrupted as well
+ Thread.currentThread().interrupt();
+ }
+ throw new HoodieGlueSyncException("Failed to delete table " +
tableId(databaseName, tableName), e);
+ }
+ }
+
@Override
public Option<String> getLastReplicatedTime(String tableName) {
throw new UnsupportedOperationException("Not supported:
`getLastReplicatedTime`");
@@ -554,6 +622,16 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
throw new UnsupportedOperationException("Not supported:
`deleteLastReplicatedTimeStamp`");
}
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = getTable(awsGlue, databaseName, tableName);
+ return table.storageDescriptor().location();
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to get base path for the table "
+ tableId(databaseName, tableName), e);
+ }
+ }
+
private List<Column> getColumnsFromSchema(Map<String, String> mapSchema) {
List<Column> cols = new ArrayList<>();
for (String key : mapSchema.keySet()) {
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index eed9486d69cd..a0efa70bc3bc 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
+import static
org.apache.hudi.config.GlueCatalogSyncClientConfig.RECREATE_GLUE_TABLE_ON_ERROR;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
/**
@@ -52,6 +53,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
syncClient = new AWSGlueCatalogSyncClient(hiveSyncConfig);
}
+ @Override
+ protected boolean shouldRecreateAndSyncTable() {
+ return config.getBooleanOrDefault(RECREATE_GLUE_TABLE_ON_ERROR);
+ }
+
public static void main(String[] args) {
final HiveSyncConfig.HiveSyncConfigParams params = new
HiveSyncConfig.HiveSyncConfigParams();
JCommander cmd = JCommander.newBuilder().addObject(params).build();
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
index efffae5bd893..c4b095d9cee5 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/config/GlueCatalogSyncClientConfig.java
@@ -22,6 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
/**
* Hoodie Configs for Glue.
@@ -46,4 +49,28 @@ public class GlueCatalogSyncClientConfig extends
HoodieConfig {
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Makes athena use the metadata table to list
partitions and files. Currently it won't benefit from other features such stats
indexes");
+
+ public static final ConfigProperty<Boolean>
META_SYNC_PARTITION_INDEX_FIELDS_ENABLE = ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields.enable")
+ .defaultValue(false)
+ .sinceVersion("0.15.0")
+ .withDocumentation("Enable aws glue partition index feature, to speedup
partition based query pattern");
+
+ public static final ConfigProperty<String> META_SYNC_PARTITION_INDEX_FIELDS
= ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "partition_index_fields")
+ .noDefaultValue()
+ .withInferFunction(cfg ->
Option.ofNullable(cfg.getString(HoodieTableConfig.PARTITION_FIELDS))
+ .or(() ->
Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))))
+ .sinceVersion("0.15.0")
+ .withDocumentation(String.join(" ", "Specify the partitions fields to
index on aws glue. Separate the fields by semicolon.",
+ "By default, when the feature is enabled, all the partition will be
indexed.",
+ "You can create up to three indexes, separate them by comma. Eg:
col1;col2;col3,col2,col3"));
+
+ public static final ConfigProperty<Boolean> RECREATE_GLUE_TABLE_ON_ERROR =
ConfigProperty
+ .key(GLUE_CLIENT_PROPERTY_PREFIX + "recreate_table_on_error")
+ .defaultValue(false)
+ .sinceVersion("0.14.2")
+ .markAdvanced()
+ .withDocumentation("Glue sync may fail if the Glue table exists with
partitions differing from the Hoodie table or if schema evolution is not
supported by Glue."
+ + "Enabling this configuration will drop and create the table to
match the Hoodie config");
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index a3fc268b04c0..ac7e775017ac 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -914,6 +914,22 @@ public class FSUtils {
return statuses;
}
+ public static boolean comparePathsWithoutScheme(String pathStr1, String
pathStr2) {
+ Path pathWithoutScheme1 = getPathWithoutScheme(new Path(pathStr1));
+ Path pathWithoutScheme2 = getPathWithoutScheme(new Path(pathStr2));
+ return pathWithoutScheme1.equals(pathWithoutScheme2);
+ }
+
+ public static Path getPathWithoutScheme(Path path) {
+ return path.isUriPathAbsolute()
+ ? new Path(null, path.toUri().getAuthority(), path.toUri().getPath())
: path;
+ }
+
+ // Converts s3a to s3a
+ public static String s3aToS3(String s3aUrl) {
+ return s3aUrl.replaceFirst("(?i)^s3a://", "s3://");
+ }
+
/**
* Serializable function interface.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 6018d496026f..7ad2bb3cb900 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -568,6 +568,69 @@ public class TestFSUtils extends HoodieCommonTestHarness {
false));
}
+ @Test
+ void testComparePathsWithoutScheme() {
+ String path1 = "s3://test_bucket_one/table/base/path";
+ String path2 = "s3a://test_bucket_two/table/base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since bucket names dont match");
+
+ path1 = "s3a://test_bucket_one/table/new_base/path";
+ path2 = "s3a://test_bucket_one/table/old_base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since paths don't match");
+
+ path1 = "s3://test_bucket_one/table/base/path";
+ path2 = "s3a://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
false since bucket names match without file shema");
+
+ path1 = "s3a://test_bucket_one/table/base/path";
+ path2 = "s3a://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since bucket names and path matches");
+
+ path1 = "gs://test_bucket_one/table/base/path";
+ path2 = "gs://test_bucket_two/table/base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return true since bucket names and path matches");
+
+ path1 = "gs://test_bucket_one/table/base/path";
+ path2 = "gs://test_bucket_one/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since bucket names and path matches");
+
+ path1 = "file:/var/table/base/path";
+ path2 = "/var/table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since path matches");
+
+ path1 = "file:/var/table/base/path";
+ path2 = "file:/var/table/old_base/path";
+ assertFalse(FSUtils.comparePathsWithoutScheme(path1, path2), "should
return false since path doesn't matches");
+
+ path1 = "table/base/path";
+ path2 = "table/base/path";
+ assertTrue(FSUtils.comparePathsWithoutScheme(path1, path2), "should return
true since relative path doesn't matches");
+ }
+
+ @Test
+ void testS3aToS3_AWS() {
+ // Test cases for AWS S3 URLs
+ assertEquals("s3://my-bucket/path/to/object",
FSUtils.s3aToS3("s3a://my-bucket/path/to/object"));
+ assertEquals("s3://my-bucket", FSUtils.s3aToS3("s3a://my-bucket"));
+ assertEquals("s3://MY-BUCKET/PATH/TO/OBJECT",
FSUtils.s3aToS3("s3a://MY-BUCKET/PATH/TO/OBJECT"));
+ assertEquals("s3://my-bucket/path/to/object",
FSUtils.s3aToS3("S3a://my-bucket/path/to/object"));
+ assertEquals("s3://my-bucket/path/to/object",
FSUtils.s3aToS3("s3A://my-bucket/path/to/object"));
+ assertEquals("s3://my-bucket/path/to/object",
FSUtils.s3aToS3("S3A://my-bucket/path/to/object"));
+ assertEquals("s3://my-bucket/s3a://another-bucket/another/path",
FSUtils.s3aToS3("s3a://my-bucket/s3a://another-bucket/another/path"));
+ }
+
+ @Test
+ void testGetPathWithoutScheme() {
+ String path1 = "s3://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
+
+ path1 = "s3a://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
+
+ path1 = "gs://test_bucket_one/table/base/path";
+ assertEquals(FSUtils.getPathWithoutScheme(new
Path(path1)).toUri().toString(), "//test_bucket_one/table/base/path", "should
return false since bucket names dont match");
+ }
+
private Path getHoodieTempDir() {
return new Path(baseUri.toString(), ".hoodie/.temp");
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 8f3cbe5b19f2..ead7b41e17ab 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -112,6 +112,24 @@ public final class SchemaTestUtil {
}
}
+ private static <T extends IndexedRecord> List<T> toRecords(Schema
writerSchema, Schema readerSchema, String path)
+ throws IOException, URISyntaxException {
+ GenericDatumReader<T> reader = new GenericDatumReader<>(writerSchema,
readerSchema);
+ Path dataPath = initializeSampleDataPath(path);
+
+ try (Stream<String> stream = Files.lines(dataPath)) {
+ return stream.map(s -> {
+ try {
+ return reader.read(null,
DecoderFactory.get().jsonDecoder(writerSchema, s));
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not read data from " + path, e);
+ }
+ }).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not read data from " + path, e);
+ }
+ }
+
/**
* Required to register the necessary JAR:// file system.
* @return Path to the sample data in the resource file.
@@ -127,6 +145,15 @@ public final class SchemaTestUtil {
}
}
+ private static Path initializeSampleDataPath(String path) throws
IOException, URISyntaxException {
+ URI resource = SchemaTestUtil.class.getResource(path).toURI();
+ if (resource.toString().contains("!")) {
+ return uriToPath(resource);
+ } else {
+ return Paths.get(SchemaTestUtil.class.getResource(path).toURI());
+ }
+ }
+
public static Path uriToPath(URI uri) throws IOException {
final Map<String, String> env = new HashMap<>();
final String[] array = uri.toString().split("!");
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 73f25b1615fc..6b9b2ccd958f 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -87,6 +87,14 @@ public class HiveSyncConfig extends HoodieSyncConfig {
.withDocumentation("Max size limit to push down partition filters, if
the estimate push down "
+ "filters exceed this size, will directly try to fetch all
partitions");
+ public static final ConfigProperty<Boolean> RECREATE_HIVE_TABLE_ON_ERROR =
ConfigProperty
+ .key("hoodie.datasource.hive_sync.recreate_table_on_error")
+ .defaultValue(false)
+ .sinceVersion("0.14.2")
+ .markAdvanced()
+ .withDocumentation("Hive sync may fail if the Hive table exists with
partitions differing from the Hoodie table or if schema evolution if not
supported by Hive."
+ + "Enabling this configuration will drop and create the table to
match the Hoodie config");
+
public static String getBucketSpec(String bucketCols, int bucketNum) {
return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 9d44bbdc07ef..7d9b2afa33ce 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.util.ConfigUtils;
@@ -50,6 +51,7 @@ import static
org.apache.hudi.common.util.StringUtils.nonEmpty;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
import static
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -90,7 +92,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
- private HiveSyncConfig config;
+ protected HiveSyncConfig config;
private final String databaseName;
private final String tableName;
@@ -181,11 +183,11 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
break;
case MERGE_ON_READ:
switch (HoodieSyncTableStrategy.valueOf(hiveSyncTableStrategy)) {
- case RO :
+ case RO:
// sync a RO table for MOR
syncHoodieTable(tableName, false, true);
break;
- case RT :
+ case RT:
// sync a RT table for MOR
syncHoodieTable(tableName, true, false);
break;
@@ -224,6 +226,48 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " +
syncClient.getBasePath()
+ " of type " + syncClient.getTableType());
+ // create database if needed
+ checkAndCreateDatabase();
+
+ final boolean tableExists = syncClient.tableExists(tableName);
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+ // if table exists and location of the metastore table doesn't match the
hoodie base path, recreate the table
+ if (tableExists &&
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(),
syncClient.getTableLocation(tableName))) {
+ LOG.info("basepath is updated for the table {}", tableName);
+ recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
+ return;
+ }
+
+ boolean schemaChanged;
+ boolean propertiesChanged;
+ try {
+ if (tableExists) {
+ schemaChanged = syncSchema(tableName, schema);
+ propertiesChanged = syncProperties(tableName, useRealtimeInputFormat,
readAsOptimized, schema);
+ } else {
+ syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
+ schemaChanged = true;
+ propertiesChanged = true;
+ }
+
+ boolean partitionsChanged = validateAndSyncPartitions(tableName,
tableExists);
+ boolean meetSyncConditions = schemaChanged || propertiesChanged ||
partitionsChanged;
+ if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) ||
meetSyncConditions) {
+ syncClient.updateLastCommitTimeSynced(tableName);
+ }
+ LOG.info("Sync complete for {}", tableName);
+ } catch (HoodieHiveSyncException ex) {
+ if (shouldRecreateAndSyncTable()) {
+ LOG.warn("failed to sync the table {}, trying to recreate", tableName,
ex);
+ recreateAndSyncHiveTable(tableName, useRealtimeInputFormat,
readAsOptimized);
+ } else {
+ throw new HoodieHiveSyncException("failed to sync the table " +
tableName, ex);
+ }
+ }
+ }
+
+ private void checkAndCreateDatabase() {
// check if the database exists else create it
if (config.getBoolean(HIVE_AUTO_CREATE_DATABASE)) {
try {
@@ -240,22 +284,9 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
throw new HoodieHiveSyncException("hive database does not exist " +
databaseName);
}
}
+ }
- final boolean tableExists = syncClient.tableExists(tableName);
-
- // Get the parquet schema for this table looking at the latest commit
- MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
- boolean schemaChanged;
- boolean propertiesChanged;
- if (tableExists) {
- schemaChanged = syncSchema(tableName, schema);
- propertiesChanged = syncProperties(tableName, useRealtimeInputFormat,
readAsOptimized, schema);
- } else {
- syncFirstTime(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
- schemaChanged = true;
- propertiesChanged = true;
- }
-
+ private boolean validateAndSyncPartitions(String tableName, boolean
tableExists) {
boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
@@ -290,12 +321,34 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
+ return partitionsChanged;
+ }
+
+ protected boolean shouldRecreateAndSyncTable() {
+ return config.getBooleanOrDefault(RECREATE_HIVE_TABLE_ON_ERROR);
+ }
- boolean meetSyncConditions = schemaChanged || propertiesChanged ||
partitionsChanged;
- if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
+ private void recreateAndSyncHiveTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
+ LOG.info("recreating and syncing the table {}", tableName);
+ MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+ try {
+ createOrReplaceTable(tableName, useRealtimeInputFormat, readAsOptimized,
schema);
+ syncAllPartitions(tableName);
syncClient.updateLastCommitTimeSynced(tableName);
+ } catch (HoodieHiveSyncException ex) {
+ throw new HoodieHiveSyncException("failed to recreate the table for " +
tableName, ex);
}
- LOG.info("Sync complete for " + tableName);
+ }
+
+ private void createOrReplaceTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized, MessageType schema) {
+ HoodieFileFormat baseFileFormat =
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+ String inputFormatClassName = getInputFormatClassName(baseFileFormat,
useRealtimeInputFormat,
config.getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT));
+ String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+ String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+ Map<String, String> serdeProperties = getSerdeProperties(readAsOptimized);
+ Map<String, String> tableProperties = getTableProperties(schema);
+ syncClient.createOrReplaceTable(tableName, schema, inputFormatClassName,
+ outputFormatClassName, serDeFormatClassName, serdeProperties,
tableProperties);
}
private Map<String, String> getTableProperties(MessageType schema) {
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 32ad873a83d3..1fe00d1c53c1 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -228,6 +228,35 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
}
}
+ public void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ if (!tableExists(tableName)) {
+ createTable(tableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+ return;
+ }
+ try {
+ // create temp table
+ String tempTableName = generateTempTableName(tableName);
+ createTable(tempTableName, storageSchema, inputFormatClass,
outputFormatClass, serdeClass, serdeProperties, tableProperties);
+
+ // if create table is successful, drop the actual table
+ // and rename temp table to actual table
+ dropTable(tableName);
+
+ Table table = client.getTable(databaseName, tempTableName);
+ table.setTableName(tableName);
+ client.alter_table(databaseName, tempTableName, table);
+ } catch (Exception ex) {
+ throw new HoodieHiveSyncException("failed to create table " +
tableId(databaseName, tableName), ex);
+ }
+ }
+
@Override
public void createTable(String tableName, MessageType storageSchema, String
inputFormatClass,
String outputFormatClass, String serdeClass,
@@ -429,4 +458,24 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
StorageDescriptor getMetastoreStorageDescriptor(String tableName) throws
TException {
return client.getTable(databaseName, tableName).getSd();
}
+
+ @Override
+ public void dropTable(String tableName) {
+ try {
+ client.dropTable(databaseName, tableName);
+ LOG.info("Successfully deleted table in Hive: {}.{}", databaseName,
tableName);
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to delete the table " +
tableId(databaseName, tableName), e);
+ }
+ }
+
+ @Override
+ public String getTableLocation(String tableName) {
+ try {
+ Table table = client.getTable(databaseName, tableName);
+ return table.getSd().getLocation();
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the basepath of the
table " + tableId(databaseName, tableName), e);
+ }
+ }
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 29bb274b015a..5bd4838a62f5 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
@@ -85,6 +86,7 @@ import static
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
+import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
@@ -243,7 +245,7 @@ public class TestHiveSyncTool {
throw new HoodieHiveSyncException("Failed to get the metastore location
from the table " + tableName, e);
}
}
-
+
@ParameterizedTest
@MethodSource("syncMode")
public void testSyncAllPartition() throws Exception {
@@ -286,7 +288,7 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote");
// Drop partition with HMSDDLExecutor
try (HMSDDLExecutor hmsExecutor =
- new HMSDDLExecutor(hiveSyncConfig,
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
+ new HMSDDLExecutor(hiveSyncConfig,
IMetaStoreClientUtil.getMSC(hiveSyncConfig.getHiveConf()))) {
hmsExecutor.dropPartitionsToTable(HiveTestUtil.TABLE_NAME,
Collections.singletonList("2010/02/03"));
}
@@ -1031,34 +1033,34 @@ public class TestHiveSyncTool {
String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(roTableName),
- "Table " + roTableName + " should not exist initially");
+ "Table " + roTableName + " should not exist initially");
assertFalse(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName + " should not exist initially");
+ "Table " + snapshotTableName + " should not exist initially");
reSyncHiveTable();
switch (strategy) {
case RO:
assertFalse(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName
- + " should not exist initially");
+ "Table " + snapshotTableName
+ + " should not exist initially");
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME
- + " should exist after sync completes");
+ "Table " + HiveTestUtil.TABLE_NAME
+ + " should exist after sync completes");
break;
case RT:
assertFalse(hiveClient.tableExists(roTableName),
- "Table " + roTableName
- + " should not exist initially");
+ "Table " + roTableName
+ + " should not exist initially");
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME
- + " should exist after sync completes");
+ "Table " + HiveTestUtil.TABLE_NAME
+ + " should exist after sync completes");
break;
default:
assertTrue(hiveClient.tableExists(roTableName),
- "Table " + roTableName
- + " should exist after sync completes");
+ "Table " + roTableName
+ + " should exist after sync completes");
assertTrue(hiveClient.tableExists(snapshotTableName),
- "Table " + snapshotTableName
- + " should exist after sync completes");
+ "Table " + snapshotTableName
+ + " should exist after sync completes");
}
}
@@ -1075,11 +1077,11 @@ public class TestHiveSyncTool {
MessageType schema = hiveClient.getStorageSchema(true);
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME + " should not exist
initially");
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
String initInputFormatClassName =
strategy.equals(HoodieSyncTableStrategy.RO)
- ? HoodieParquetRealtimeInputFormat.class.getName()
- : HoodieParquetInputFormat.class.getName();
+ ? HoodieParquetRealtimeInputFormat.class.getName()
+ : HoodieParquetInputFormat.class.getName();
String outputFormatClassName =
HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET);
String serDeFormatClassName =
HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET);
@@ -1087,28 +1089,28 @@ public class TestHiveSyncTool {
// Create table 'test1'.
hiveClient.createDatabase(HiveTestUtil.DB_NAME);
hiveClient.createTable(HiveTestUtil.TABLE_NAME, schema,
initInputFormatClassName,
- outputFormatClassName, serDeFormatClassName, new HashMap<>(), new
HashMap<>());
+ outputFormatClassName, serDeFormatClassName, new HashMap<>(), new
HashMap<>());
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
- "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist initially");
String targetInputFormatClassName =
strategy.equals(HoodieSyncTableStrategy.RO)
- ? HoodieParquetInputFormat.class.getName()
- : HoodieParquetRealtimeInputFormat.class.getName();
+ ? HoodieParquetInputFormat.class.getName()
+ : HoodieParquetRealtimeInputFormat.class.getName();
StorageDescriptor storageDescriptor =
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
assertEquals(initInputFormatClassName, storageDescriptor.getInputFormat(),
- "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
+ "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
assertFalse(storageDescriptor.getSerdeInfo().getParameters().containsKey(ConfigUtils.IS_QUERY_AS_RO_TABLE),
- "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
+ "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should not exist");
reSyncHiveTable();
storageDescriptor =
hiveClient.getMetastoreStorageDescriptor(HiveTestUtil.TABLE_NAME);
assertEquals(targetInputFormatClassName,
- storageDescriptor.getInputFormat(),
- "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
+ storageDescriptor.getInputFormat(),
+ "Table " + HiveTestUtil.TABLE_NAME + " inputFormat should be " +
targetInputFormatClassName);
assertEquals(storageDescriptor.getSerdeInfo().getParameters().get(ConfigUtils.IS_QUERY_AS_RO_TABLE),
- strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
- "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
+ strategy.equals(HoodieSyncTableStrategy.RO) ? "true" : "false",
+ "Table " + HiveTestUtil.TABLE_NAME + " serdeInfo parameter " +
ConfigUtils.IS_QUERY_AS_RO_TABLE + " should be ");
}
@@ -1651,4 +1653,206 @@ public class TestHiveSyncTool {
private int getPartitionFieldSize() {
return
hiveSyncProps.getString(META_SYNC_PARTITION_FIELDS.key()).split(",").length;
}
+
+ @ParameterizedTest
+ @MethodSource("syncMode")
+ public void testSyncHoodieTableCatchRuntimeException(String syncMode) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+ // Create a tool instance to initialize the sync client
+ HiveSyncTool tool = new HiveSyncTool(hiveSyncProps, getHiveConf());
+ try {
+ // Corrupt the table by deleting the commit file to cause a
RuntimeException during sync
+ // This will cause doSync() to fail when it tries to read the storage
schema
+ Path metadataPath = new Path(HiveTestUtil.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME);
+ Path instantPath = new Path(metadataPath, commitTime + ".commit");
+ if (HiveTestUtil.fileSystem.exists(instantPath)) {
+ HiveTestUtil.fileSystem.delete(instantPath, false);
+ }
+
+ // Verify that syncHoodieTable catches RuntimeException and wraps it in
HoodieException
+ HoodieException exception = assertThrows(HoodieException.class, () -> {
+ tool.syncHoodieTable();
+ }, "syncHoodieTable should throw HoodieException when RuntimeException
occurs");
+
+ // Verify the exception message contains the table name
+ assertTrue(exception.getMessage().contains(HiveTestUtil.TABLE_NAME),
+ "Exception message should contain table name: " +
HiveTestUtil.TABLE_NAME);
+ assertTrue(exception.getMessage().contains("Got runtime exception when
hive syncing"),
+ "Exception message should contain expected error message");
+
+ // Verify that the cause is a RuntimeException
+ assertTrue(exception.getCause() instanceof RuntimeException,
+ "The cause should be a RuntimeException");
+ } finally {
+ // Ensure tool is closed even if exception is thrown
+ tool.close();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncMode")
+ public void testRecreateAndSyncHiveTableOnError(String syncMode) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(RECREATE_HIVE_TABLE_ON_ERROR.key(), "true");
+
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
+
+ // First sync should succeed
+ reSyncHiveTable();
+ assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist after first sync");
+ assertEquals(commitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be updated in the
TBLPROPERTIES");
+
+ // Verify the Hoodie table structure is intact
+ Path hoodieMetaPath = new Path(HiveTestUtil.basePath,
HoodieTableMetaClient.METAFOLDER_NAME);
+ assertTrue(HiveTestUtil.fileSystem.exists(hoodieMetaPath),
+ "Hoodie metadata folder should exist at " + hoodieMetaPath);
+
+ // Corrupt the Hive table by dropping it and creating it with incompatible
serde/input format
+ // This will cause syncProperties to potentially fail when trying to
update to Parquet formats
+ ddlExecutor.runSQL("DROP TABLE IF EXISTS `" + HiveTestUtil.TABLE_NAME +
"`");
+
+ // Create table with correct schema but wrong input/output format
+ // The location must point to the correct basePath to keep Hoodie metadata
accessible
+ String createTableSql = String.format(
+ "CREATE TABLE `%s` (`name` STRING, `favorite_number` INT,
`favorite_color` STRING) "
+ + "PARTITIONED BY (`datestr` STRING) "
+ + "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' "
+ + "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' "
+ + "LOCATION '%s'",
+ HiveTestUtil.TABLE_NAME, HiveTestUtil.basePath);
+ ddlExecutor.runSQL(createTableSql);
+
+ // Now sync should attempt to update the serde properties to use Parquet
formats
+ // If this fails with HoodieHiveSyncException, recreateAndSyncHiveTable
should be called
+ // which will drop and recreate the table with correct formats
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ // Verify the table was recreated and synced successfully
+ assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
+ "Table " + HiveTestUtil.TABLE_NAME + " should exist after recreation");
+
+ // Verify the schema is correct
+ Map<String, String> schema =
hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME);
+ assertTrue(schema.containsKey("name"), "Schema should contain 'name'
field");
+ assertEquals("string", schema.get("name").toLowerCase(),
+ "Schema should have correct type for 'name' field after recreation");
+ // Verify the last commit time is synced, indicating successful recreation
and sync
+ assertEquals(commitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be updated after recreation");
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ public void testRecreateCOWTableOnBasePathChange(String syncMode, String
enablePushDown) throws Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String commitTime1 = "100";
+ HiveTestUtil.createCOWTable(commitTime1, 5, true);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ String commitTime2 = "105";
+ // let's update the basepath
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ // let's create new table in new basepath
+ HiveTestUtil.createCOWTable(commitTime2, 2, true);
+ // Now lets create more partitions and these are the only ones which needs
to be synced
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "110";
+ // let's add 2 more partitions to the new basepath
+ HiveTestUtil.addCOWPartitions(2, false, true, dateTime, commitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // after reinitializing hive client, table location shouldn't match hoodie
base path
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+
+ // Lets do the sync
+ reSyncHiveTable();
+ // verify partition count should be 4 from new basepath, not 5 from old
+ assertEquals(4,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
+ "the 4 partitions from new base path should be present for hive");
+ // verify last commit time synced
+ assertEquals(commitTime3,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
+ "The last commit that was synced should be 110");
+ // table location now should be updated to latest hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(HiveTestUtil.TABLE_NAME), "new table location
should match hoodie basepath");
+ }
+
+ @ParameterizedTest
+ @MethodSource("syncModeAndSchemaFromCommitMetadata")
+ public void testSyncMergeOnReadWithBasePathChange(boolean
useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws
Exception {
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+
+ String instantTime = "100";
+ String deltaCommitTime = "101";
+ HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+ useSchemaFromCommitMetadata);
+
+ String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+ String rtTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+ reInitHiveSyncClient();
+ assertFalse(hiveClient.tableExists(roTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ assertFalse(hiveClient.tableExists(rtTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // change the hoodie base path
+ basePath = Files.createTempDirectory("hivesynctest_new" +
Instant.now().toEpochMilli()).toUri().toString();
+ hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
+
+ String instantTime2 = "102";
+ String deltaCommitTime2 = "103";
+ // let's create MOR table in the new basepath
+ HiveTestUtil.createMORTable(instantTime2, deltaCommitTime2, 2, true,
+ useSchemaFromCommitMetadata);
+
+ // let's add more partitions in the new basepath
+ ZonedDateTime dateTime = ZonedDateTime.now().plusDays(6);
+ String commitTime3 = "104";
+ String deltaCommitTime3 = "105";
+ HiveTestUtil.addMORPartitions(2, true, false,
+ useSchemaFromCommitMetadata, dateTime, commitTime3, deltaCommitTime3);
+
+ // reinitialize hive client
+ reInitHiveSyncClient();
+ // verify table location is different from hoodie basepath
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should not match
hoodie base path before sync");
+ assertNotEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should not match
hoodie base path before sync");
+ // Lets do the sync
+ reSyncHiveTable();
+
+ // verify partition count should be 4, not 5 from old basepath
+ assertEquals(4, hiveClient.getAllPartitions(roTableName).size(),
+ "the 4 partitions from new base path should be present for ro table");
+ assertEquals(4, hiveClient.getAllPartitions(rtTableName).size(),
+ "the 4 partitions from new base path should be present for rt table");
+ // verify last synced commit time
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(roTableName).get(),
+ "The last commit that was synced should be 103");
+ assertEquals(deltaCommitTime3,
hiveClient.getLastCommitTimeSynced(rtTableName).get(),
+ "The last commit that was synced should be 103");
+ // verify table location is updated to the new hoodie basepath
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(roTableName), "ro table location should match
hoodie base path after sync");
+ assertEquals(hiveClient.getBasePath(),
hiveClient.getTableLocation(rtTableName), "rt table location should match
hoodie base path after sync");
+ }
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index 87af1d16d75c..c1d1f5bf22c2 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -20,6 +20,7 @@
package org.apache.hudi.sync.common;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
@@ -56,6 +57,27 @@ public interface HoodieMetaSyncOperations {
}
+ /**
+ * Create or replace the table.
+ *
+ * @param tableName The table name.
+ * @param storageSchema The table schema.
+ * @param inputFormatClass The input format class of this table.
+ * @param outputFormatClass The output format class of this table.
+ * @param serdeClass The serde class of this table.
+ * @param serdeProperties The serde properties of this table.
+ * @param tableProperties The table properties for this table.
+ */
+ default void createOrReplaceTable(String tableName,
+ MessageType storageSchema,
+ String inputFormatClass,
+ String outputFormatClass,
+ String serdeClass,
+ Map<String, String> serdeProperties,
+ Map<String, String> tableProperties) {
+
+ }
+
/**
* Check if table exists in metastore.
*/
@@ -165,6 +187,13 @@ public interface HoodieMetaSyncOperations {
return Collections.emptyList();
}
+ /**
+ * Get the base path of the table from metastore
+ */
+ default String getTableLocation(String tableName) {
+ return StringUtils.EMPTY_STRING;
+ }
+
/**
* Update the field comments for table in metastore, by using the ones from
storage.
*
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 4c5fb01b9e75..6ec35c435a05 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -56,6 +57,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
protected final HoodieSyncConfig config;
protected final PartitionValueExtractor partitionValueExtractor;
protected final HoodieTableMetaClient metaClient;
+ private static final String TEMP_SUFFIX = "_temp";
public HoodieSyncClient(HoodieSyncConfig config) {
this.config = config;
@@ -244,4 +246,8 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
}
return paths;
}
+
+ protected String generateTempTableName(String tableName) {
+ return tableName + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond();
+ }
}
diff --git a/packaging/bundle-validation/docker_java17/docker_java17_test.sh
b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
index 7fcc9e5000e3..ce32ca4fa4ac 100755
--- a/packaging/bundle-validation/docker_java17/docker_java17_test.sh
+++ b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
@@ -51,28 +51,18 @@ start_datanode () {
echo "::warning::docker_test_java17.sh starting datanode:"$DN
- cat $HADOOP_HOME/hadoop/etc/hdfs-site.xml
- cat $HADOOP_HOME/hadoop/etc/core-site.xml
-
DN_DIR_PREFIX=$DOCKER_TEST_DIR/additional_datanode/
PID_DIR=$DOCKER_TEST_DIR/pid/$1
- if [ -z $DN_DIR_PREFIX ]; then
- mkdir -p $DN_DIR_PREFIX
- fi
-
- if [ -z $PID_DIR ]; then
- mkdir -p $PID_DIR
- fi
-
- export HADOOP_PID_DIR=$PID_PREFIX
+ mkdir -p $DN_DIR_PREFIX $PID_DIR
+ export HADOOP_PID_DIR=$PID_DIR
DN_CONF_OPTS="\
-Dhadoop.tmp.dir=$DN_DIR_PREFIX$DN\
-Ddfs.datanode.address=localhost:5001$DN \
-Ddfs.datanode.http.address=localhost:5008$DN \
-Ddfs.datanode.ipc.address=localhost:5002$DN"
- $HADOOP_HOME/bin/hdfs --daemon start datanode $DN_CONF_OPTS
- $HADOOP_HOME/bin/hdfs dfsadmin -report
+ bash $HADOOP_HOME/bin/hdfs --daemon start datanode $DN_CONF_OPTS
+ bash $HADOOP_HOME/bin/hdfs dfsadmin -report
}
setup_hdfs () {
@@ -80,20 +70,41 @@ setup_hdfs () {
mv /opt/bundle-validation/tmp-conf-dir/hdfs-site.xml
$HADOOP_HOME/etc/hadoop/hdfs-site.xml
mv /opt/bundle-validation/tmp-conf-dir/core-site.xml
$HADOOP_HOME/etc/hadoop/core-site.xml
- $HADOOP_HOME/bin/hdfs namenode -format
- $HADOOP_HOME/bin/hdfs --daemon start namenode
- echo "::warning::docker_test_java17.sh starting hadoop hdfs"
- $HADOOP_HOME/sbin/start-dfs.sh
+ mkdir -p $DOCKER_TEST_DIR/pid
+ export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid
- # start datanodes
+ bash $HADOOP_HOME/bin/hdfs namenode -format
+ bash $HADOOP_HOME/bin/hdfs --daemon start namenode
+
+ echo "::warning::docker_test_java17.sh waiting for NameNode to start"
+ NAMENODE_READY=0
+ for i in $(seq 1 30); do
+ if bash $HADOOP_HOME/bin/hdfs dfsadmin -report >/dev/null 2>&1; then
+ NAMENODE_READY=1
+ break
+ fi
+ sleep 2
+ done
+ if [ "$NAMENODE_READY" -ne 1 ]; then
+ echo "::error::docker_test_java17.sh NameNode failed to start"
+ exit 1
+ fi
+
+ echo "::warning::docker_test_java17.sh starting default datanode"
+ bash $HADOOP_HOME/bin/hdfs --daemon start datanode
+
+ echo "::warning::docker_test_java17.sh waiting for DataNode to register"
+ sleep 10
+
+ # start additional datanodes
for i in $(seq 1 3)
do
start_datanode $i
done
echo "::warning::docker_test_java17.sh starting hadoop hdfs, hdfs report"
- $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/root
- $HADOOP_HOME/bin/hdfs dfs -ls /user/
+ bash $HADOOP_HOME/bin/hdfs dfs -mkdir -p /user/root
+ bash $HADOOP_HOME/bin/hdfs dfs -ls /user/
if [ "$?" -ne 0 ]; then
echo "::error::docker_test_java17.sh Failed setting up HDFS!"
exit 1
@@ -103,7 +114,14 @@ setup_hdfs () {
stop_hdfs() {
use_default_java_runtime
echo "::warning::docker_test_java17.sh stopping hadoop hdfs"
- $HADOOP_HOME/sbin/stop-dfs.sh
+ export HADOOP_PID_DIR=${HADOOP_PID_DIR:-$DOCKER_TEST_DIR/pid}
+ bash $HADOOP_HOME/bin/hdfs --daemon stop datanode 2>/dev/null || true
+ for i in 1 2 3; do
+ export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid/$i
+ bash $HADOOP_HOME/bin/hdfs --daemon stop datanode 2>/dev/null || true
+ done
+ export HADOOP_PID_DIR=$DOCKER_TEST_DIR/pid
+ bash $HADOOP_HOME/bin/hdfs --daemon stop namenode 2>/dev/null || true
}
build_hudi_java8 () {
@@ -123,7 +141,14 @@ build_hudi_java8 () {
mkdir -p $JARS_DIR
fi
- cp ./packaging/hudi-spark-bundle/target/hudi-spark*.jar $JARS_DIR/spark.jar
+ echo "::warning::docker_test_java17.sh copy hudi-spark bundle jar to target
folder"
+ cp ./packaging/hudi-spark-bundle/target/hudi-spark*SNAPSHOT.jar
$JARS_DIR/spark.jar
+ echo "::warning::docker_test_java17.sh copy hudi-spark bundle jar to target
folder DONE"
+
+ if [ "$?" -ne 0 ]; then
+ echo "::error::docker_test_java17.sh Failed to copy hudi-spark bundle jar
to target folder"
+ exit 1
+ fi
}
run_docker_tests() {