This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.1
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.14.1 by this push:
     new 52309055f0c Revert "Add cachedSchema per batch, fix idempotency with 
getSourceSchema calls"
52309055f0c is described below

commit 52309055f0ccac2f860c9f784e0610095f7d5d1d
Author: sivabalan <n.siv...@gmail.com>
AuthorDate: Sat Dec 23 18:59:55 2023 -0800

    Revert "Add cachedSchema per batch, fix idempotency with getSourceSchema 
calls"
    
    This reverts commit dff42eb468cafe43e9208c0ae738c91184ded673.
---
 .../utilities/schema/FilebasedSchemaProvider.java  | 29 +++++------------
 .../hudi/utilities/schema/SchemaProvider.java      |  5 ---
 .../utilities/schema/SchemaRegistryProvider.java   | 36 +++++-----------------
 .../apache/hudi/utilities/streamer/StreamSync.java |  5 +--
 .../schema/TestSchemaRegistryProvider.java         | 20 ------------
 5 files changed, 16 insertions(+), 79 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 9dbf66325d7..3ca97b01f95 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -45,11 +45,6 @@ public class FilebasedSchemaProvider extends SchemaProvider {
 
   private final FileSystem fs;
 
-  private final String sourceFile;
-  private final String targetFile;
-  private final boolean shouldSanitize;
-  private final String invalidCharMask;
-
   protected Schema sourceSchema;
 
   protected Schema targetSchema;
@@ -57,21 +52,18 @@ public class FilebasedSchemaProvider extends SchemaProvider 
{
   public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) 
{
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE));
-    this.sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
-    this.targetFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE, sourceFile);
-    this.shouldSanitize = SanitizationUtils.shouldSanitize(props);
-    this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
+    String sourceFile = getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE);
+    boolean shouldSanitize = SanitizationUtils.shouldSanitize(props);
+    String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
-    this.sourceSchema = parseSchema(this.sourceFile);
+    this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, 
shouldSanitize, invalidCharMask);
     if (containsConfigProperty(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE)) {
-      this.targetSchema = parseSchema(this.targetFile);
+      this.targetSchema = readAvroSchemaFromFile(
+          getStringWithAltKeys(props, 
FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE),
+          this.fs, shouldSanitize, invalidCharMask);
     }
   }
 
-  private Schema parseSchema(String schemaFile) {
-    return readAvroSchemaFromFile(schemaFile, this.fs, shouldSanitize, 
invalidCharMask);
-  }
-
   @Override
   public Schema getSourceSchema() {
     return sourceSchema;
@@ -95,11 +87,4 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     }
     return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, 
invalidCharMask);
   }
-
-  // Per write batch, refresh the schemas from the file
-  @Override
-  public void refresh() {
-    this.sourceSchema = parseSchema(this.sourceFile);
-    this.targetSchema = parseSchema(this.targetFile);
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
index 5c8ca8f6c1b..2410798d355 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProvider.java
@@ -56,9 +56,4 @@ public abstract class SchemaProvider implements Serializable {
     // by default, use source schema as target for hoodie table as well
     return getSourceSchema();
   }
-
-  //every schema provider has the ability to refresh itself, which will mean 
something different per provider.
-  public void refresh() {
-
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index f31e867e96e..c3541e6aab0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -82,12 +82,6 @@ public class SchemaRegistryProvider extends SchemaProvider {
     public static final String SSL_KEY_PASSWORD_PROP = 
"schema.registry.ssl.key.password";
   }
 
-  protected Schema cachedSourceSchema;
-  protected Schema cachedTargetSchema;
-
-  private final String srcSchemaRegistryUrl;
-  private final String targetSchemaRegistryUrl;
-
   @FunctionalInterface
   public interface SchemaConverter {
     /**
@@ -166,8 +160,6 @@ public class SchemaRegistryProvider extends SchemaProvider {
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     checkRequiredConfigProperties(props, 
Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL));
-    this.srcSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
-    this.targetSchemaRegistryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, srcSchemaRegistryUrl);
     if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
         || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
       setUpSSLStores();
@@ -199,42 +191,30 @@ public class SchemaRegistryProvider extends 
SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
+    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
     try {
-      if (cachedSourceSchema == null) {
-        cachedSourceSchema = 
parseSchemaFromRegistry(this.srcSchemaRegistryUrl);
-      }
-      return cachedSourceSchema;
+      return parseSchemaFromRegistry(registryUrl);
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading source schema from registry. Please check %s is 
configured correctly. Truncated URL: %s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(srcSchemaRegistryUrl, 10, 10)), e);
+          StringUtils.truncate(registryUrl, 10, 10)), e);
     }
   }
 
   @Override
   public Schema getTargetSchema() {
+    String registryUrl = getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL);
+    String targetRegistryUrl =
+        getStringWithAltKeys(config, 
HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
     try {
-      if (cachedTargetSchema == null) {
-        cachedTargetSchema = 
parseSchemaFromRegistry(this.targetSchemaRegistryUrl);
-      }
-      return cachedTargetSchema;
+      return parseSchemaFromRegistry(targetRegistryUrl);
     } catch (Exception e) {
       throw new HoodieSchemaFetchException(String.format(
           "Error reading target schema from registry. Please check %s is 
configured correctly. If that is not configured then check %s. Truncated URL: 
%s",
           Config.SRC_SCHEMA_REGISTRY_URL_PROP,
           Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
-          StringUtils.truncate(targetSchemaRegistryUrl, 10, 10)), e);
+          StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
     }
   }
-
-  // Per SyncOnce call, the cachedschema for the provider is dropped and 
SourceSchema re-attained
-  // Subsequent calls to getSourceSchema within the write batch should be 
cached.
-  @Override
-  public void refresh() {
-    cachedSourceSchema = null;
-    cachedTargetSchema = null;
-    getSourceSchema();
-    getTargetSchema();
-  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 17a0ee2e3bf..e756602b1cd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -449,10 +449,7 @@ public class StreamSync implements Serializable, Closeable 
{
 
       result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, 
overallTimerContext);
     }
-    // refresh schemas if need be before next batch
-    if (schemaProvider != null) {
-      schemaProvider.refresh();
-    }
+
     metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
     return result;
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 44421d5e059..59e04d77602 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -133,24 +133,4 @@ class TestSchemaRegistryProvider {
           .toString();
     }
   }
-
-  // The SR is checked when cachedSchema is empty, when not empty, the 
cachedSchema is used.
-  @Test
-  public void testGetSourceSchemaUsesCachedSchema() throws IOException {
-    TypedProperties props = getProps();
-    SchemaRegistryProvider spyUnderTest = getUnderTest(props);
-
-    // Call when cachedSchema is empty
-    Schema actual = spyUnderTest.getSourceSchema();
-    assertNotNull(actual);
-    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
-
-    assert spyUnderTest.cachedSourceSchema != null;
-
-    Schema actualTwo = spyUnderTest.getSourceSchema();
-    
-    // cachedSchema should now be set, a subsequent call should not call 
parseSchemaFromRegistry
-    // Assuming this verify() has the scope of the whole test? so it should 
still be 1 from previous call?
-    verify(spyUnderTest, times(1)).parseSchemaFromRegistry(Mockito.any());
-  }
 }

Reply via email to