This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.14.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.14.1 by this push: new 52309055f0c Revert "Add cachedSchema per batch, fix idempotency with getSourceSchema calls" 52309055f0c is described below commit 52309055f0ccac2f860c9f784e0610095f7d5d1d Author: sivabalan <n.siv...@gmail.com> AuthorDate: Sat Dec 23 18:59:55 2023 -0800 Revert "Add cachedSchema per batch, fix idempotency with getSourceSchema calls" This reverts commit dff42eb468cafe43e9208c0ae738c91184ded673. --- .../utilities/schema/FilebasedSchemaProvider.java | 29 +++++------------ .../hudi/utilities/schema/SchemaProvider.java | 5 --- .../utilities/schema/SchemaRegistryProvider.java | 36 +++++----------------- .../apache/hudi/utilities/streamer/StreamSync.java | 5 +-- .../schema/TestSchemaRegistryProvider.java | 20 ------------ 5 files changed, 16 insertions(+), 79 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java index 9dbf66325d7..3ca97b01f95 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java @@ -45,11 +45,6 @@ public class FilebasedSchemaProvider extends SchemaProvider { private final FileSystem fs; - private final String sourceFile; - private final String targetFile; - private final boolean shouldSanitize; - private final String invalidCharMask; - protected Schema sourceSchema; protected Schema targetSchema; @@ -57,21 +52,18 @@ public class FilebasedSchemaProvider extends SchemaProvider { public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); checkRequiredConfigProperties(props, Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE)); - this.sourceFile = getStringWithAltKeys(props, FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE); - this.targetFile = getStringWithAltKeys(props, FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile); - this.shouldSanitize = SanitizationUtils.shouldSanitize(props); - this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props); + String sourceFile = getStringWithAltKeys(props, FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE); + boolean shouldSanitize = SanitizationUtils.shouldSanitize(props); + String invalidCharMask = SanitizationUtils.getInvalidCharMask(props); this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true); - this.sourceSchema = parseSchema(this.sourceFile); + this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, shouldSanitize, invalidCharMask); if (containsConfigProperty(props, FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) { - this.targetSchema = parseSchema(this.targetFile); + this.targetSchema = readAvroSchemaFromFile( + getStringWithAltKeys(props, FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE), + this.fs, shouldSanitize, invalidCharMask); } } - private Schema parseSchema(String schemaFile) { - return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize, invalidCharMask); - } - @Override public Schema getSourceSchema() { return sourceSchema; @@ -95,11 +87,4 @@ public class FilebasedSchemaProvider extends SchemaProvider { } return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask); } - - // Per write batch, refresh the schemas from the file - @Override - public void refresh() { - this.sourceSchema = parseSchema(this.sourceFile); - this.targetSchema = parseSchema(this.targetFile); - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java index 5c8ca8f6c1b..2410798d355 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java @@ -56,9 +56,4 @@ public abstract class SchemaProvider implements Serializable { // by default, use source schema as target for hoodie table as well return getSourceSchema(); } - - //every schema provider has the ability to refresh itself, which will mean something different per provider. - public void refresh() { - - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index f31e867e96e..c3541e6aab0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -82,12 +82,6 @@ public class SchemaRegistryProvider extends SchemaProvider { public static final String SSL_KEY_PASSWORD_PROP = "schema.registry.ssl.key.password"; } - protected Schema cachedSourceSchema; - protected Schema cachedTargetSchema; - - private final String srcSchemaRegistryUrl; - private final String targetSchemaRegistryUrl; - @FunctionalInterface public interface SchemaConverter { /** @@ -166,8 +160,6 @@ public class SchemaRegistryProvider extends SchemaProvider { public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); checkRequiredConfigProperties(props, Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL)); - this.srcSchemaRegistryUrl = getStringWithAltKeys(config, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL); - this.targetSchemaRegistryUrl = getStringWithAltKeys(config, HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl); if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP) || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) { setUpSSLStores(); @@ -199,42 +191,30 @@ public class SchemaRegistryProvider extends SchemaProvider { @Override public Schema getSourceSchema() { + String registryUrl = getStringWithAltKeys(config, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL); try { - if (cachedSourceSchema == null) { - cachedSourceSchema = parseSchemaFromRegistry(this.srcSchemaRegistryUrl); - } - return cachedSourceSchema; + return parseSchemaFromRegistry(registryUrl); } catch (Exception e) { throw new HoodieSchemaFetchException(String.format( "Error reading source schema from registry. Please check %s is configured correctly. Truncated URL: %s", Config.SRC_SCHEMA_REGISTRY_URL_PROP, - StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e); + StringUtils.truncate(registryUrl, 10, 10)), e); } } @Override public Schema getTargetSchema() { + String registryUrl = getStringWithAltKeys(config, HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL); + String targetRegistryUrl = + getStringWithAltKeys(config, HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl); try { - if (cachedTargetSchema == null) { - cachedTargetSchema = parseSchemaFromRegistry(this.targetSchemaRegistryUrl); - } - return cachedTargetSchema; + return parseSchemaFromRegistry(targetRegistryUrl); } catch (Exception e) { throw new HoodieSchemaFetchException(String.format( "Error reading target schema from registry. Please check %s is configured correctly. If that is not configured then check %s. Truncated URL: %s", Config.SRC_SCHEMA_REGISTRY_URL_PROP, Config.TARGET_SCHEMA_REGISTRY_URL_PROP, - StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e); + StringUtils.truncate(targetRegistryUrl, 10, 10)), e); } } - - // Per SyncOnce call, the cachedschema for the provider is dropped and SourceSchema re-attained - // Subsequent calls to getSourceSchema within the write batch should be cached. - @Override - public void refresh() { - cachedSourceSchema = null; - cachedTargetSchema = null; - getSourceSchema(); - getTargetSchema(); - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 17a0ee2e3bf..e756602b1cd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -449,10 +449,7 @@ public class StreamSync implements Serializable, Closeable { result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, overallTimerContext); } - // refresh schemas if need be before next batch - if (schemaProvider != null) { - schemaProvider.refresh(); - } + metrics.updateStreamerSyncMetrics(System.currentTimeMillis()); return result; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java index 44421d5e059..59e04d77602 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java @@ -133,24 +133,4 @@ class TestSchemaRegistryProvider { .toString(); } } - - // The SR is checked when cachedSchema is empty, when not empty, the cachedSchema is used. - @Test - public void testGetSourceSchemaUsesCachedSchema() throws IOException { - TypedProperties props = getProps(); - SchemaRegistryProvider spyUnderTest = getUnderTest(props); - - // Call when cachedSchema is empty - Schema actual = spyUnderTest.getSourceSchema(); - assertNotNull(actual); - verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any()); - - assert spyUnderTest.cachedSourceSchema != null; - - Schema actualTwo = spyUnderTest.getSourceSchema(); - - // cachedSchema should now be set, a subsequent call should not call parseSchemaFromRegistry - // Assuming this verify() has the scope of the whole test? so it should still be 1 from previous call? - verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any()); - } }