This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bba6af0fa1 [HUDI-6462] Add Hudi client init callback interface (#9108)
4bba6af0fa1 is described below

commit 4bba6af0fa104ad8eef0ecd62e0aedf67bbe33a4
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Mon Jul 3 22:18:37 2023 -0700

    [HUDI-6462] Add Hudi client init callback interface (#9108)
    
    This PR adds the interface for Hudi client init callback to run custom 
logic at the time of initialization of a Hudi client:
    
    @PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
    public interface HoodieClientInitCallback {
      /**
       * A callback method in which the user can implement custom logic.
       * This method is called when a {@link BaseHoodieClient} is initialized.
       *
       * @param hoodieClient {@link BaseHoodieClient} instance.
       */
      @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
      void call(BaseHoodieClient hoodieClient);
    }
    At the time of instantiation of the write or table service client, a user 
may want to do additional processing, such as sending metrics, logsm 
notification, or adding more properties to the write config. The implementation 
of client init callback interface allows such logic to be plugged into Hudi.
    
    A new config, hoodie.client.init.callback.classes, is added for plugging in 
the callback implementation. The class list is comma-separated.
---
 .../hudi/callback/HoodieClientInitCallback.java    |  40 ++++
 .../org/apache/hudi/client/BaseHoodieClient.java   |  21 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  31 ++-
 .../callback/TestHoodieClientInitCallback.java     | 234 +++++++++++++++++++++
 4 files changed, 320 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
new file mode 100644
index 00000000000..a86eded75e5
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/HoodieClientInitCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.client.BaseHoodieClient;
+
+/**
+ * A callback interface to run custom logic at the time of initialization of 
the Hudi client.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieClientInitCallback {
+  /**
+   * A callback method in which the user can implement custom logic.
+   * This method is called when a {@link BaseHoodieClient} is initialized.
+   *
+   * @param hoodieClient {@link BaseHoodieClient} instance.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  void call(BaseHoodieClient hoodieClient);
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index e01ffb20719..26b10c1c1bf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.HoodieClientInitCallback;
 import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -30,8 +31,11 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.metrics.HoodieMetrics;
@@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
@@ -92,6 +97,7 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     this.txnManager = new TransactionManager(config, fs);
     startEmbeddedServerView();
     initWrapperFSMetrics();
+    runClientInitCallbacks();
   }
 
   /**
@@ -137,6 +143,21 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
     }
   }
 
+  private void runClientInitCallbacks() {
+    String callbackClassNames = config.getClientInitCallbackClassNames();
+    if (StringUtils.isNullOrEmpty(callbackClassNames)) {
+      return;
+    }
+    Arrays.stream(callbackClassNames.split(",")).forEach(callbackClass -> {
+      Object callback = ReflectionUtils.loadClass(callbackClass);
+      if (!(callback instanceof HoodieClientInitCallback)) {
+        throw new HoodieException(callbackClass + " is not a subclass of "
+            + HoodieClientInitCallback.class.getName());
+      }
+      ((HoodieClientInitCallback) callback).call(this);
+    });
+  }
+
   public HoodieWriteConfig getConfig() {
     return config;
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index bc964b3cfe8..93105491180 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -698,14 +698,24 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "a configured filter `ssl`, value for config 
`ssl.trustore.location` would be masked.");
 
   public static final ConfigProperty<Boolean> ROLLBACK_INSTANT_BACKUP_ENABLED 
= ConfigProperty
-          .key("hoodie.rollback.instant.backup.enabled")
-          .defaultValue(false)
-          .withDocumentation("Backup instants removed during rollback and 
restore (useful for debugging)");
+      .key("hoodie.rollback.instant.backup.enabled")
+      .defaultValue(false)
+      .withDocumentation("Backup instants removed during rollback and restore 
(useful for debugging)");
 
   public static final ConfigProperty<String> ROLLBACK_INSTANT_BACKUP_DIRECTORY 
= ConfigProperty
-          .key("hoodie.rollback.instant.backup.dir")
-          .defaultValue(".rollback_backup")
-          .withDocumentation("Path where instants being rolled back are 
copied. If not absolute path then a directory relative to .hoodie folder is 
created.");
+      .key("hoodie.rollback.instant.backup.dir")
+      .defaultValue(".rollback_backup")
+      .withDocumentation("Path where instants being rolled back are copied. If 
not absolute path then a directory relative to .hoodie folder is created.");
+
+  public static final ConfigProperty<String> CLIENT_INIT_CALLBACK_CLASS_NAMES 
= ConfigProperty
+      .key("hoodie.client.init.callback.classes")
+      .defaultValue("")
+      .markAdvanced()
+      .sinceVersion("0.14.0")
+      .withDocumentation("Fully-qualified class names of the Hudi client init 
callbacks to run "
+          + "at the initialization of the Hudi client.  The class names are 
separated by `,`. "
+          + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
+          + "By default, no Hudi client init callback is executed.");
 
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
@@ -2535,6 +2545,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(ROLLBACK_INSTANT_BACKUP_DIRECTORY);
   }
 
+  public String getClientInitCallbackClassNames() {
+    return getString(CLIENT_INIT_CALLBACK_CLASS_NAMES);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3017,6 +3031,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withClientInitCallbackClassNames(String classNames) {
+      writeConfig.setValue(CLIENT_INIT_CALLBACK_CLASS_NAMES, classNames);
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
new file mode 100644
index 00000000000..1ede02413fb
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback;
+import org.apache.hudi.client.BaseHoodieClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.callback.TestHoodieClientInitCallback.AddConfigInitCallbackTestClass.CUSTOM_CONFIG_KEY1;
+import static 
org.apache.hudi.callback.TestHoodieClientInitCallback.AddConfigInitCallbackTestClass.CUSTOM_CONFIG_VALUE1;
+import static 
org.apache.hudi.callback.TestHoodieClientInitCallback.ChangeConfigInitCallbackTestClass.CUSTOM_CONFIG_KEY2;
+import static 
org.apache.hudi.callback.TestHoodieClientInitCallback.ChangeConfigInitCallbackTestClass.CUSTOM_CONFIG_VALUE2;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link HoodieClientInitCallback}.
+ */
+public class TestHoodieClientInitCallback {
+  @TempDir
+  java.nio.file.Path tmpDir;
+
+  @Mock
+  static HoodieSparkEngineContext engineContext =
+      Mockito.mock(HoodieSparkEngineContext.class);
+
+  @BeforeAll
+  public static void setup() {
+    when(engineContext.getHadoopConf())
+        .thenReturn(new SerializableConfiguration(new Configuration()));
+  }
+
+  @Test
+  public void testNoClientInitCallback() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tmpDir.toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        .build(false);
+    assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+
+    SparkRDDWriteClient<Object> writeClient = new 
SparkRDDWriteClient<>(engineContext, config);
+
+    assertFalse(writeClient.getConfig().contains(CUSTOM_CONFIG_KEY1));
+    
assertFalse(writeClient.getTableServiceClient().getConfig().contains(CUSTOM_CONFIG_KEY1));
+  }
+
+  @Test
+  public void testSingleClientInitCallback() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tmpDir.toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        
.withClientInitCallbackClassNames(ChangeConfigInitCallbackTestClass.class.getName())
+        .withProps(Collections.singletonMap(
+            WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
+        .build(false);
+    assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+    assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+        .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+
+    SparkRDDWriteClient<Object> writeClient = new 
SparkRDDWriteClient<>(engineContext, config);
+
+    HoodieWriteConfig updatedConfig = writeClient.getConfig();
+    assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+    Schema actualSchema = new 
Schema.Parser().parse(updatedConfig.getWriteSchema());
+    assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+    assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+
+    updatedConfig = writeClient.getTableServiceClient().getConfig();
+    assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+    actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+    assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+    assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+  }
+
+  @Test
+  public void testTwoClientInitCallbacks() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tmpDir.toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withClientInitCallbackClassNames(
+            ChangeConfigInitCallbackTestClass.class.getName() + ","
+                + AddConfigInitCallbackTestClass.class.getName())
+        .withProps(Collections.singletonMap(
+            WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
+        .build(false);
+    assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
+    assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+        .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+
+    SparkRDDWriteClient<Object> writeClient = new 
SparkRDDWriteClient<>(engineContext, config);
+
+    HoodieWriteConfig updatedConfig = writeClient.getConfig();
+    assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+    assertEquals(CUSTOM_CONFIG_VALUE1, 
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
+    Schema actualSchema = new 
Schema.Parser().parse(updatedConfig.getWriteSchema());
+    assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+    assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+
+    updatedConfig = writeClient.getTableServiceClient().getConfig();
+    assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
+    assertEquals(CUSTOM_CONFIG_VALUE1, 
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
+    actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+    assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
+    assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
+  }
+
+  @Test
+  public void testClientInitCallbackThrowingException() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tmpDir.toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withClientInitCallbackClassNames(
+            AddConfigInitCallbackTestClass.class.getName() + ","
+                + ThrowExceptionCallbackTestClass.class.getName())
+        .build(false);
+    HoodieIOException exception = assertThrows(
+        HoodieIOException.class,
+        () -> new SparkRDDWriteClient<>(engineContext, config),
+        "Expects the initialization to throw a HoodieIOException");
+    assertEquals(
+        "Throwing exception during client initialization.",
+        exception.getMessage());
+  }
+
+  @ParameterizedTest
+  @MethodSource("testArgsForNonCallbackClass")
+  public void testNonClientInitCallbackClassInConfig(String className, String 
errorMsg) {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath(tmpDir.toString())
+        .withEmbeddedTimelineServerEnabled(false)
+        .withClientInitCallbackClassNames(className)
+        .build(false);
+    HoodieException exception = assertThrows(
+        HoodieException.class,
+        () -> new SparkRDDWriteClient<>(engineContext, config),
+        "Expects the initialization to throw a HoodieException");
+    assertEquals(errorMsg, exception.getMessage());
+  }
+
+  private static Stream<Arguments> testArgsForNonCallbackClass() {
+    return Arrays.stream(new String[][] {
+        {HoodieWriteCommitHttpCallback.class.getName(),
+            "Could not load class " + 
HoodieWriteCommitHttpCallback.class.getName()},
+        {NonSortPartitionerWithRows.class.getName(),
+            NonSortPartitionerWithRows.class.getName() + " is not a subclass 
of " + HoodieClientInitCallback.class.getName()}
+    }).map(Arguments::of);
+  }
+
+  /**
+   * A test {@link HoodieClientInitCallback} implementation to add 
`user.defined.key1` config.
+   */
+  public static class AddConfigInitCallbackTestClass implements 
HoodieClientInitCallback {
+    public static final String CUSTOM_CONFIG_KEY1 = "user.defined.key1";
+    public static final String CUSTOM_CONFIG_VALUE1 = "value1";
+
+    @Override
+    public void call(BaseHoodieClient hoodieClient) {
+      HoodieWriteConfig config = hoodieClient.getConfig();
+      config.setValue(CUSTOM_CONFIG_KEY1, CUSTOM_CONFIG_VALUE1);
+    }
+  }
+
+  /**
+   * A test {@link HoodieClientInitCallback} implementation to add the property
+   * `user.defined.key2=value2` to the write schema.
+   */
+  public static class ChangeConfigInitCallbackTestClass implements 
HoodieClientInitCallback {
+    public static final String CUSTOM_CONFIG_KEY2 = "user.defined.key2";
+    public static final String CUSTOM_CONFIG_VALUE2 = "value2";
+
+    @Override
+    public void call(BaseHoodieClient hoodieClient) {
+      HoodieWriteConfig config = hoodieClient.getConfig();
+      Schema schema = new Schema.Parser().parse(config.getWriteSchema());
+      if (!schema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2)) {
+        schema.addProp(CUSTOM_CONFIG_KEY2, CUSTOM_CONFIG_VALUE2);
+      }
+      config.getProps().setProperty(WRITE_SCHEMA_OVERRIDE.key(), 
schema.toString());
+    }
+  }
+
+  /**
+   * A test {@link HoodieClientInitCallback} implementation to throw an 
exception.
+   */
+  public static class ThrowExceptionCallbackTestClass implements 
HoodieClientInitCallback {
+    @Override
+    public void call(BaseHoodieClient hoodieClient) {
+      throw new HoodieIOException("Throwing exception during client 
initialization.");
+    }
+  }
+}

Reply via email to