This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b44e19cc2fd [HUDI-8930] Fixing concurrency handling during upgrade
(#12737)
b44e19cc2fd is described below
commit b44e19cc2fda5ec3c4eeea649399677a187c4fa8
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jan 30 00:32:25 2025 -0800
[HUDI-8930] Fixing concurrency handling during upgrade (#12737)
* minor fixes to upgrade path
* Fixes for concurrency handling during upgrade
* fix build failure
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../hudi/table/upgrade/UpgradeDowngradeUtils.java | 15 ++-
.../apache/hudi/config/TestHoodieWriteConfig.java | 37 ++++++
.../client/transaction/lock/NoopLockProvider.java | 84 ++++++++++++
.../common/config/HoodieTimeGeneratorConfig.java | 18 +++
.../hudi/common/table/timeline/TimeGenerators.java | 22 +++-
.../transaction/lock/TestNoopLockProvider.java | 145 +++++++++++++++++++++
.../table/timeline/TestWaitBasedTimeGenerator.java | 29 +++++
.../hudi/functional/TestSevenToEightUpgrade.scala | 23 +++-
9 files changed, 360 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 85a3a1f6bc0..ddc0aa8bc46 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3496,7 +3496,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private void autoAdjustConfigsForConcurrencyMode(boolean
isLockProviderPropertySet) {
// for a single writer scenario, with all table services inline, lets
set InProcessLockProvider
- if (writeConfig.getWriteConcurrencyMode() ==
WriteConcurrencyMode.SINGLE_WRITER && !writeConfig.areAnyTableServicesAsync()) {
+ if (writeConfig.isAutoAdjustLockConfigs() &&
writeConfig.getWriteConcurrencyMode() == WriteConcurrencyMode.SINGLE_WRITER &&
!writeConfig.areAnyTableServicesAsync()) {
if (writeConfig.getLockProviderClass() != null &&
!writeConfig.getLockProviderClass().equals(InProcessLockProvider.class.getCanonicalName()))
{
// add logs only when explicitly overridden by the user.
LOG.warn(String.format("For a single writer mode, overriding lock
provider class (%s) to %s. So, user configured lock provider %s may not take
effect",
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index e5618d69a35..08a0ce4b1d2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -166,9 +167,17 @@ public class UpgradeDowngradeUtils {
// set required configs for rollback
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
// NOTE: at this stage rollback should use the current writer version
and disable auto upgrade/downgrade
- TypedProperties properties = config.getProps();
- properties.remove(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key());
- properties.remove(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key());
+ TypedProperties properties = new TypedProperties();
+ properties.putAll(config.getProps());
+ // TimeGeneratos are cached and re-used based on table base path. Since
here we are changing the lock configurations, avoiding the cache use
+ // for upgrade code block.
+
properties.put(HoodieTimeGeneratorConfig.TIME_GENERATOR_REUSE_ENABLE.key(),"false");
+ // override w/ NoopLock Provider to avoid re-entrant locking. already
upgrade is happening within the table level lock.
+ // Belew we do trigger rollback and compaction which might again try to
acquire the lock. So, here we are explicitly overriding to
+ // NoopLockProvider for just the upgrade code block.
+
properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),"org.apache.hudi.client.transaction.lock.NoopLockProvider");
+ // if auto adjust it not disabled, chances that InProcessLockProvider
will get overriden for single writer use-cases.
+ properties.put("hoodie.auto.adjust.lock.configs","false");
HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
.withProps(properties)
.withWriteTableVersion(tableVersion.versionCode())
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 69a48e2fd7b..cb46cec8242 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -20,6 +20,7 @@ package org.apache.hudi.config;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.transaction.lock.NoopLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
@@ -529,6 +530,42 @@ public class TestHoodieWriteConfig {
HoodieFailedWritesCleaningPolicy.LAZY,
FileSystemBasedLockProviderTestClass.class.getName());
}
+ @Test
+ public void testTimeGeneratorConfig() {
+
+ HoodieWriteConfig writeConfig = createWriteConfig(new HashMap<String,
String>() {
+ {
+ put(HoodieTableConfig.TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
+ }
+ });
+
+ // validate the InProcessLockProvider kicks in if no explicit lock
provider is set.
+ assertEquals(InProcessLockProvider.class.getName(),
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+ writeConfig = createWriteConfig(new HashMap<String, String>() {
+ {
+ put(HoodieTableConfig.TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
+ put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
NoopLockProvider.class.getName());
+ }
+ });
+
+ // validate the the configured lock provider is honored by the
TimeGeneratorConfig as well.
+ assertEquals(NoopLockProvider.class.getName(),
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+ // if auto adjust lock config is enabled, for a single writer w/ all
inline table services, InProcessLockProvider is overriden
+ writeConfig = createWriteConfig(new HashMap<String, String>() {
+ {
+ put(HoodieTableConfig.TYPE.key(),
HoodieTableType.COPY_ON_WRITE.name());
+ put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
NoopLockProvider.class.getName());
+ put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
+ }
+ });
+
+ // validate the InProcessLockProvider kicks in due to auto adjust lock
configs
+ assertEquals(InProcessLockProvider.class.getName(),
writeConfig.getTimeGeneratorConfig().getLockConfiguration().getConfig().getProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()));
+
+ }
+
@Test
public void testConsistentBucketIndexDefaultClusteringConfig() {
Properties props = new Properties();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
new file mode 100644
index 00000000000..65c6aa32b32
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/client/transaction/lock/NoopLockProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * NoopLockProvider as the name suggests, is a no op lock provider. Any caller
asking for a lock will be able to get hold of the lock.
+ * This is not meant to be used a producation grade lock providers. This is
meant to be used for Hudi's internal operations.
+ * For eg: During upgrade, we have nested lock situations and we leverage this
{@code NoopLockProvider} for any operations we
+ * might want to do within the upgradeHandler blocks to avoid re-entrant
situations. Not all lock providers might support re-entrancy and during upgrade,
+ * it is expected to have a single writer to the Hudi table of interest.
+ */
+public class NoopLockProvider implements LockProvider<ReentrantReadWriteLock>,
Serializable {
+
+ public NoopLockProvider(final LockConfiguration lockConfiguration, final
StorageConfiguration<?> conf) {
+ // no op.
+ }
+
+ @Override
+ public boolean tryLock(long time, @NotNull TimeUnit unit) throws
InterruptedException {
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ // no op.
+ }
+
+ @Override
+ public void lockInterruptibly() {
+ // no op.
+ }
+
+ @Override
+ public void lock() {
+ // no op.
+ }
+
+ @Override
+ public boolean tryLock() {
+ return true;
+ }
+
+ @Override
+ public ReentrantReadWriteLock getLock() {
+ return new ReentrantReadWriteLock();
+ }
+
+ @Override
+ public String getCurrentOwnerLockInfo() {
+ return StringUtils.EMPTY_STRING;
+ }
+
+ @Override
+ public void close() {
+ // no op.
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
index d48a9d0d8bc..16ebff9135b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTimeGeneratorConfig.java
@@ -56,6 +56,15 @@ public class HoodieTimeGeneratorConfig extends HoodieConfig {
.withDocumentation("The max expected clock skew time in ms between two
processes generating time. Used by "
+ TimeGeneratorType.WAIT_TO_ADJUST_SKEW.name() + " time generator to
implement TrueTime semantics.");
+ public static final ConfigProperty<Boolean> TIME_GENERATOR_REUSE_ENABLE =
ConfigProperty
+ .key("_hoodie.time.generator.reuse.enable")
+ .defaultValue(true)
+ .sinceVersion("1.0.1")
+ .markAdvanced()
+ .withDocumentation("Used only for internal purposes. TimeGeneratos are
cached per table base path and re-used across invocations. "
+ + "For some internal purposes, we wanted to avoid using the cached
TimeGenerator (like upgrade flows). Hence this config "
+ + "will be used internally during upgrade flow. No advisable for end
user to use this config. ");
+
private HoodieTimeGeneratorConfig() {
super();
}
@@ -68,6 +77,10 @@ public class HoodieTimeGeneratorConfig extends HoodieConfig {
return getLong(MAX_EXPECTED_CLOCK_SKEW_MS);
}
+ public boolean canReuseTimeGenerator() {
+ return getBoolean(TIME_GENERATOR_REUSE_ENABLE);
+ }
+
public String getBasePath() {
return getString(BASE_PATH);
}
@@ -108,6 +121,11 @@ public class HoodieTimeGeneratorConfig extends
HoodieConfig {
return this;
}
+ public Builder withReuseTimeGenerator(boolean reuseTimeGenerator) {
+ timeGeneratorConfig.setValue(TIME_GENERATOR_REUSE_ENABLE,
String.valueOf(reuseTimeGenerator));
+ return this;
+ }
+
public Builder withPath(String basePath) {
timeGeneratorConfig.setValue(BASE_PATH, basePath);
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
index 2dce07dc267..b410af6f453 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGenerators.java
@@ -42,12 +42,20 @@ public class TimeGenerators {
StorageConfiguration<?>
storageConf) {
ValidationUtils.checkState(timeGeneratorConfig.contains(BASE_PATH),
"Option [" + BASE_PATH.key() + "] is required");
ValidationUtils.checkArgument(storageConf != null, "Hadoop configuration
is required");
- return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s -> {
- TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType();
- if (Objects.requireNonNull(type) ==
TimeGeneratorType.WAIT_TO_ADJUST_SKEW) {
- return new SkewAdjustingTimeGenerator(timeGeneratorConfig,
storageConf);
- }
- throw new IllegalArgumentException("Unsupported TimeGenerator Type " +
type);
- });
+ if (timeGeneratorConfig.canReuseTimeGenerator()) {
+ return TIME_GENERATOR_CACHE.get(timeGeneratorConfig.getBasePath(), s ->
getNewTimeGenerator(timeGeneratorConfig, storageConf));
+ } else {
+ return getNewTimeGenerator(timeGeneratorConfig, storageConf);
+ }
+ }
+
+ private static TimeGenerator getNewTimeGenerator(HoodieTimeGeneratorConfig
timeGeneratorConfig,
+ StorageConfiguration<?>
storageConf) {
+ // reuse is set to false.
+ TimeGeneratorType type = timeGeneratorConfig.getTimeGeneratorType();
+ if (Objects.requireNonNull(type) == TimeGeneratorType.WAIT_TO_ADJUST_SKEW)
{
+ return new SkewAdjustingTimeGenerator(timeGeneratorConfig, storageConf);
+ }
+ throw new IllegalArgumentException("Unsupported TimeGenerator Type " +
type);
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
new file mode 100644
index 00000000000..e27da253eb7
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/client/transaction/lock/TestNoopLockProvider.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Tests {@code NoopLockProvider}.
+ */
+public class TestNoopLockProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestNoopLockProvider.class);
+ private final StorageConfiguration<?> storageConf = getDefaultStorageConf();
+ private final LockConfiguration lockConfiguration1;
+ private final LockConfiguration lockConfiguration2;
+
+ public TestNoopLockProvider() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(HoodieCommonConfig.BASE_PATH.key(), "table1");
+ lockConfiguration1 = new LockConfiguration(properties);
+ properties.put(HoodieCommonConfig.BASE_PATH.key(), "table2");
+ lockConfiguration2 = new LockConfiguration(properties);
+ }
+
+ @Test
+ public void testLockAcquisition() {
+ NoopLockProvider noopLockProvider = new
NoopLockProvider(lockConfiguration1, storageConf);
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider.unlock();
+ });
+ }
+
+ @Test
+ public void testLockReAcquisitionBySameThread() {
+ NoopLockProvider noopLockProvider = new
NoopLockProvider(lockConfiguration1, storageConf);
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+ }
+
+ @Test
+ public void testLockReAcquisitionBySameThreadWithTwoTables() {
+ NoopLockProvider noopLockProvider1 = new
NoopLockProvider(lockConfiguration1, storageConf);
+ NoopLockProvider noopLockProvider2 = new
NoopLockProvider(lockConfiguration2, storageConf);
+
+ assertDoesNotThrow(() -> {
+ noopLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider2.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider1.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider2.unlock();
+ });
+ }
+
+ @Test
+ public void testLockReAcquisitionByDifferentThread() {
+ NoopLockProvider noopLockProvider = new
NoopLockProvider(lockConfiguration1, storageConf);
+ final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+
+ // Main test thread
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+
+ // Another writer thread in parallel, should be able to acquire the lock
instantly
+ Thread writer2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ noopLockProvider.lock();
+ });
+ assertDoesNotThrow(() -> {
+ noopLockProvider.unlock();
+ });
+ writer2Completed.set(true);
+ }
+ });
+ writer2.start();
+
+ assertDoesNotThrow(() -> {
+ noopLockProvider.unlock();
+ });
+
+ try {
+ writer2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+ Assertions.assertTrue(writer2Completed.get());
+
+ writer2.interrupt();
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
index f6e92943cdf..d776229e63e 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestWaitBasedTimeGenerator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -34,6 +35,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
public class TestWaitBasedTimeGenerator {
public static class MockInProcessLockProvider extends InProcessLockProvider {
@@ -145,4 +149,29 @@ public class TestWaitBasedTimeGenerator {
Assertions.assertTrue(t2Timestamp.get() < t1Timestamp.get());
}
}
+
+ @Test
+ public void testTimeGeneratorCache() {
+ TimeGenerator timeGenerator1 =
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+ TimeGenerator timeGenerator2 =
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+ TimeGenerator timeGenerator3 =
TimeGenerators.getTimeGenerator(timeGeneratorConfig, storageConf);
+
+ assertEquals(timeGenerator1, timeGenerator2);
+ assertEquals(timeGenerator1, timeGenerator3);
+
+ // disable reuse
+ HoodieTimeGeneratorConfig timeGeneratorConfigWithNoReuse =
HoodieTimeGeneratorConfig.newBuilder()
+ .withPath("test_wait_based")
+ .withMaxExpectedClockSkewMs(25L)
+ .withReuseTimeGenerator(false)
+ .withTimeGeneratorType(TimeGeneratorType.WAIT_TO_ADJUST_SKEW)
+ .build();
+
+ TimeGenerator timeGenerator4 =
TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf);
+ assertNotEquals(timeGenerator1, timeGenerator4);
+ // how many ever times we call, we should get new time generator
+ TimeGenerator timeGenerator5 =
TimeGenerators.getTimeGenerator(timeGeneratorConfigWithNoReuse, storageConf);
+ assertNotEquals(timeGenerator4, timeGenerator5);
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 862cd4f9ae9..5779bdeb93f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -22,21 +22,30 @@ import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
+import org.apache.hudi.config.HoodieLockConfig
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
@ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- def testPartitionFieldsWithUpgrade(tableType: HoodieTableType): Unit = {
+ @CsvSource(value = Array(
+ "COPY_ON_WRITE,null",
+
"COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+ "COPY_ON_WRITE,org.apache.hudi.client.transaction.lock.NoopLockProvider",
+ "MERGE_ON_READ,null",
+
"MERGE_ON_READ,org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+ "MERGE_ON_READ,org.apache.hudi.client.transaction.lock.NoopLockProvider"
+ ))
+ def testPartitionFieldsWithUpgrade(tableType: HoodieTableType,
lockProviderClass: String): Unit = {
val partitionFields = "partition:simple"
// Downgrade handling for metadata not yet ready.
- val hudiOpts = commonOpts ++ Map(
+ val hudiOptsWithoutLockConfigs = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
KeyGeneratorType.CUSTOM.getClassName,
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> partitionFields,
@@ -45,6 +54,12 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
DataSourceWriteOptions.RECORD_MERGE_MODE.key ->
RecordMergeMode.COMMIT_TIME_ORDERING.name)
+ val hudiOpts = if (!lockProviderClass.equals("null")) {
+ hudiOptsWithoutLockConfigs ++
Map(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> lockProviderClass)
+ } else {
+ hudiOptsWithoutLockConfigs
+ }
+
doWriteAndValidateDataAndRecordIndex(hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite,