This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b995a8c5d3 [HUDI-6005] Auto generate client id for Flink multi writer 
(#8323)
4b995a8c5d3 is described below

commit 4b995a8c5d36c08744f08f218ddab84b1c6317bd
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Fri Mar 31 11:22:08 2023 +0800

    [HUDI-6005] Auto generate client id for Flink multi writer (#8323)
    
    The current flink does not have a good msg notification mechanism from JM 
to TM tasks.
    In order to get rid of this, we introduced two kind of fs based messages on 
the JM:
    
    1. the view_storage_conf file to keep remote fs view storage properties
    2. the ckp_metadata used for pending instants fetching in light-weight way
    
    Both of the two messages work well on single-writer scenario, we just use a 
fixed name file path for
    these files/directories. But in multi-writer scenario, writing into same 
name msg file incurs conflicts,
    the msg file finally got corrupted and the existing writers can be impacted.
    
    In HUDI-5673, we introduced an option 'write.client.id' for manual conflict 
resolution, the user needs to
    config the client id for each of the multi-writers. This is valid but very 
inconvenient in production practice.
    
    In this patch, we try to auto infer the client id when the job is submitted 
from the client machine.
    On start up, each job tries to scramble for a client id first, after the 
job is running, the coordinator on JM
    would try to send a heartbeat message for its client id in a fixed time 
interval(by default 1 minute).
    
    The heartbeat is mainly used for deciding whether the job that holds the 
client id is still alive, if not, the client id
    can be recycled and reused.
---
 .../apache/hudi/configuration/FlinkOptions.java    |   3 +-
 .../hudi/configuration/OptionsInference.java       |  28 +++
 .../apache/hudi/configuration/OptionsResolver.java |   8 +
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  18 ++
 .../org/apache/hudi/sink/meta/CkpMetadata.java     |   7 +-
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   1 +
 .../org/apache/hudi/table/HoodieTableSink.java     |   2 +
 .../main/java/org/apache/hudi/util/ClientIds.java  | 276 +++++++++++++++++++++
 .../apache/hudi/util/ViewStorageProperties.java    |   2 -
 .../hudi/configuration/TestOptionsInference.java   |  78 ++++++
 10 files changed, 417 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c1af00f06b7..2d9243ab5b4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -41,6 +41,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.apache.hudi.util.ClientIds;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -572,7 +573,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> WRITE_CLIENT_ID = ConfigOptions
       .key("write.client.id")
       .stringType()
-      .defaultValue("")
+      .defaultValue(ClientIds.INIT_CLIENT_ID)
       .withDescription("Unique identifier used to distinguish different writer 
pipelines for concurrent mode");
 
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
index 3e02d237327..9ffa71726e4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsInference.java
@@ -18,12 +18,17 @@
 
 package org.apache.hudi.configuration;
 
+import org.apache.hudi.util.ClientIds;
+
 import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tool helping to infer the flink options {@link FlinkOptions}.
  */
 public class OptionsInference {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OptionsInference.class);
 
   /**
    * Sets up the default source task parallelism if it is not specified.
@@ -62,4 +67,27 @@ public class OptionsInference {
       conf.setInteger(FlinkOptions.CLUSTERING_TASKS, writeTasks);
     }
   }
+
+  /**
+   * Utilities that help to auto generate the client id for multi-writer 
scenarios.
+   * It basically handles two cases:
+   *
+   * <ul>
+   *   <li>find the next client id for the new job;</li>
+   *   <li>clean the existing inactive client heartbeat files.</li>
+   * </ul>
+   *
+   * @see ClientIds
+   */
+  public static void setupClientId(Configuration conf) {
+    if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+      // explicit client id always has higher priority
+      if (!conf.contains(FlinkOptions.WRITE_CLIENT_ID)) {
+        try (ClientIds clientIds = ClientIds.builder().conf(conf).build()) {
+          String clientId = clientIds.nextId(conf);
+          conf.setString(FlinkOptions.WRITE_CLIENT_ID, clientId);
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 7db25105a04..7ab2d7b2dff 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -250,6 +250,14 @@ public class OptionsResolver {
             
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
   }
 
+  /**
+   * Returns whether OCC is enabled.
+   */
+  public static boolean isOptimisticConcurrencyControl(Configuration conf) {
+    return conf.getString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+        
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+  }
+
   /**
    * Returns the index type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 778b528a118..55c6d99ac6c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -39,6 +39,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
+import org.apache.hudi.util.ClientIds;
 import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
@@ -155,6 +156,11 @@ public class StreamWriteOperatorCoordinator
    */
   private CkpMetadata ckpMetadata;
 
+  /**
+   * The client id heartbeats.
+   */
+  private ClientIds clientIds;
+
   /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
@@ -193,6 +199,10 @@ public class StreamWriteOperatorCoordinator
     if (tableState.syncHive) {
       initHiveSync();
     }
+    // start client id heartbeats for optimistic concurrency control
+    if (OptionsResolver.isOptimisticConcurrencyControl(conf)) {
+      initClientIds(conf);
+    }
   }
 
   @Override
@@ -213,6 +223,9 @@ public class StreamWriteOperatorCoordinator
     if (this.ckpMetadata != null) {
       this.ckpMetadata.close();
     }
+    if (this.clientIds != null) {
+      this.clientIds.close();
+    }
   }
 
   @Override
@@ -350,6 +363,11 @@ public class StreamWriteOperatorCoordinator
     return ckpMetadata;
   }
 
+  private void initClientIds(Configuration conf) {
+    this.clientIds = ClientIds.builder().conf(conf).build();
+    this.clientIds.start();
+  }
+
   private void reset() {
     this.eventBuffer = new WriteMetadataEvent[this.parallelism];
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 6336365ad64..a9a099f8494 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -49,15 +49,15 @@ import java.util.stream.Collectors;
  *
  * <p>Why we use the DFS based message queue instead of sending
  * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
- * The write task handles the operator event using the main mailbox executor 
which has the lowest priority for mails,
- * it is also used to process the inputs. When the write task blocks and waits 
for the operator event to ack the valid instant to write,
+ * The writer task thread handles the operator event using the main mailbox 
executor which has the lowest priority for mails,
+ * it is also used to process the inputs. When the writer task blocks and 
waits for the operator event to ack the valid instant to write,
  * it actually blocks all the subsequent events in the mailbox, the operator 
event would never be consumed then it causes deadlock.
  *
  * <p>The checkpoint metadata is also more lightweight than the active 
timeline.
  *
  * <p>NOTE: should be removed in the future if we have good manner to handle 
the async notifications from driver.
  */
-public class CkpMetadata implements Serializable {
+public class CkpMetadata implements Serializable, AutoCloseable {
   private static final long serialVersionUID = 1L;
 
   private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
@@ -219,6 +219,7 @@ public class CkpMetadata implements Serializable {
   }
 
   protected static String ckpMetaPath(String basePath, String uniqueId) {
+    // .hoodie/.aux/ckp_meta
     String metaPath = basePath + Path.SEPARATOR + 
HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
     return StringUtils.isNullOrEmpty(uniqueId) ? metaPath : metaPath + "_" + 
uniqueId;
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 16dae0768d0..c353dd2baa4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -99,6 +99,7 @@ public class HoodieFlinkStreamer {
     }
 
     OptionsInference.setupSinkTasks(conf, env.getParallelism());
+    OptionsInference.setupClientId(conf);
     DataStream<Object> pipeline;
     // Append mode
     if (OptionsResolver.isAppendMode(conf)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index f8799d3ac94..af10b620e69 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -69,6 +69,8 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
       // set up default parallelism
       OptionsInference.setupSinkTasks(conf, 
dataStream.getExecutionConfig().getParallelism());
+      // set up client id
+      OptionsInference.setupClientId(conf);
 
       RowType rowType = (RowType) 
schema.toSinkRowDataType().notNull().getLogicalType();
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClientIds.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClientIds.java
new file mode 100644
index 00000000000..dc12ae386f0
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClientIds.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieHeartbeatException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
+
+/**
+ * This class creates a client id heartbeat for a new driver.
+ * The heartbeat is used to ascertain whether a driver is still alive or not.
+ * Currently, it is mainly used for two cases:
+ *
+ * <ol>
+ *   <li>Legacy service cleaning, when a new driver starts up,
+ *   it also checks whether there are some zombie clients whose heartbeat file 
can be removed;</li>
+ *   <li>Auto generates a new client id, tries to reuse the existing ids from 
the zombie clients first or
+ *   generates an inc id based on the largest alive client id.
+ *   </li>
+ * </ol>
+ *
+ * <p>NOTE: Due to CPU contention on the driver/client node, the heartbeats 
could be delayed, hence it's important to set
+ * the value high enough to avoid that possibility.
+ */
+public class ClientIds implements AutoCloseable, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(ClientIds.class);
+
+  private static final String HEARTBEAT_FOLDER_NAME = ".ids";
+  private static final String HEARTBEAT_FILE_NAME_PREFIX = "_";
+  public static final String INIT_CLIENT_ID = "";
+  public static final long DEFAULT_HEARTBEAT_INTERVAL_IN_MS = 60 * 1000; // 
default 1 minute
+  public static final int DEFAULT_NUM_TOLERABLE_HEARTBEAT_MISSES = 5;    // by 
default decide the service is stopped if it is inactive for 5 minutes
+
+  /**
+   * The filesystem.
+   */
+  private final transient FileSystem fs;
+
+  /**
+   * The heartbeat file path.
+   */
+  private final Path heartbeatFilePath;
+
+  /**
+   * The heartbeat interval in milliseconds.
+   */
+  private final long heartbeatIntervalInMs;
+
+  /**
+   * The threshold beyond which we can think the service is a zombie.
+   */
+  private final long heartbeatTimeoutThresholdInMs;
+
+  private ScheduledExecutorService executor;
+
+  private boolean started;
+
+  private ClientIds(FileSystem fs, String basePath, String uniqueId, long 
heartbeatIntervalInMs, int numTolerableHeartbeatMisses) {
+    this.fs = fs;
+    this.heartbeatFilePath = getHeartbeatFilePath(basePath, uniqueId);
+    this.heartbeatIntervalInMs = heartbeatIntervalInMs;
+    this.heartbeatTimeoutThresholdInMs = numTolerableHeartbeatMisses * 
heartbeatIntervalInMs;
+  }
+
+  public void start() {
+    if (started) {
+      LOG.info("The service heartbeat client is already started, skips the 
action");
+    }
+    updateHeartbeat();
+    this.executor = Executors.newScheduledThreadPool(1);
+    this.executor.scheduleAtFixedRate(this::updateHeartbeat, 
this.heartbeatIntervalInMs, this.heartbeatIntervalInMs, TimeUnit.MILLISECONDS);
+    this.started = true;
+  }
+
+  /**
+   * Returns the builder.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public void close() {
+    if (this.executor != null) {
+      this.executor.shutdownNow();
+      this.executor = null;
+    }
+    this.started = false;
+  }
+
+  public static boolean isHeartbeatExpired(FileSystem fs, Path path, long 
timeoutThreshold) {
+    try {
+      if (fs.exists(path)) {
+        long modifyTime = fs.getFileStatus(path).getModificationTime();
+        long currentTime = System.currentTimeMillis();
+        return currentTime - modifyTime > timeoutThreshold;
+      }
+    } catch (IOException e) {
+      // if any exception happens, just return false.
+      LOG.error("Check heartbeat file existence error: " + path);
+    }
+    return false;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  private String getHeartbeatFolderPath(String basePath) {
+    return basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME + Path.SEPARATOR + 
HEARTBEAT_FOLDER_NAME;
+  }
+
+  private Path getHeartbeatFilePath(String basePath, String uniqueId) {
+    String heartbeatFolderPath = getHeartbeatFolderPath(basePath);
+    String fileName = StringUtils.isNullOrEmpty(uniqueId) ? 
HEARTBEAT_FILE_NAME_PREFIX : HEARTBEAT_FILE_NAME_PREFIX + uniqueId;
+    return new Path(heartbeatFolderPath, fileName);
+  }
+
+  private void updateHeartbeat() throws HoodieHeartbeatException {
+    updateHeartbeat(this.heartbeatFilePath);
+  }
+
+  private void updateHeartbeat(Path heartbeatFilePath) throws 
HoodieHeartbeatException {
+    try {
+      OutputStream outputStream =
+          this.fs.create(heartbeatFilePath, true);
+      outputStream.close();
+    } catch (IOException io) {
+      throw new HoodieHeartbeatException("Unable to generate heartbeat ", io);
+    }
+  }
+
+  @VisibleForTesting
+  public String nextId(Configuration conf) {
+    String basePath = conf.getString(FlinkOptions.PATH);
+    String nextId = nextId(conf, basePath);
+    // update the heartbeat immediately in case there are client preemption 
conflict for the same id.
+    updateHeartbeat(getHeartbeatFilePath(basePath, nextId));
+    return nextId;
+  }
+
+  private String nextId(Configuration conf, String basePath) {
+    Path heartbeatFolderPath = new Path(getHeartbeatFolderPath(basePath));
+    FileSystem fs = FSUtils.getFs(heartbeatFolderPath, 
HadoopConfigurations.getHadoopConf(conf));
+    try {
+      if (!fs.exists(heartbeatFolderPath)) {
+        return INIT_CLIENT_ID;
+      }
+      List<Path> sortedPaths = 
Arrays.stream(fs.listStatus(heartbeatFolderPath))
+          .map(FileStatus::getPath)
+          .sorted(Comparator.comparing(Path::getName))
+          .collect(Collectors.toList());
+      if (sortedPaths.isEmpty()) {
+        return INIT_CLIENT_ID;
+      }
+      List<Path> zombieHeartbeatPaths = sortedPaths.stream()
+          .filter(path -> ClientIds.isHeartbeatExpired(fs, path, 
this.heartbeatTimeoutThresholdInMs))
+          .collect(Collectors.toList());
+      if (!zombieHeartbeatPaths.isEmpty()) {
+        // 1. If there are any zombie client ids, reuse the smallest one
+        for (Path path : zombieHeartbeatPaths) {
+          fs.delete(path, true);
+          LOG.warn("Delete inactive ckp metadata path: " + path);
+        }
+        return getClientId(zombieHeartbeatPaths.get(0));
+      }
+      // 2. else returns an auto inc id
+      String largestClientId = getClientId(sortedPaths.get(sortedPaths.size() 
- 1));
+      return INIT_CLIENT_ID.equals(largestClientId) ? "1" : 
(Integer.parseInt(largestClientId) + 1) + "";
+    } catch (IOException e) {
+      throw new RuntimeException("Generate next client id error", e);
+    }
+  }
+
+  /**
+   * Returns the client id from the heartbeat file path, the path name follows
+   * the naming convention: _, _1, _2, ... _N.
+   */
+  private static String getClientId(Path path) {
+    String[] splits = path.getName().split(HEARTBEAT_FILE_NAME_PREFIX);
+    return splits.length > 1 ? splits[1] : INIT_CLIENT_ID;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Inner classes
+  // -------------------------------------------------------------------------
+
+  /**
+   * Builder for {@link ClientIds}.
+   */
+  public static class Builder {
+    private FileSystem fs;
+    private String basePath;
+    private String clientId = INIT_CLIENT_ID;
+    private long heartbeatIntervalInMs = DEFAULT_HEARTBEAT_INTERVAL_IN_MS;
+    private int numTolerableHeartbeatMisses = 
DEFAULT_NUM_TOLERABLE_HEARTBEAT_MISSES;
+
+    public Builder fs(FileSystem fs) {
+      this.fs = fs;
+      return this;
+    }
+
+    public Builder basePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder clientId(String clientId) {
+      this.clientId = clientId;
+      return this;
+    }
+
+    public Builder conf(Configuration conf) {
+      this.basePath = conf.getString(FlinkOptions.PATH);
+      this.fs = FSUtils.getFs(this.basePath, 
HadoopConfigurations.getHadoopConf(conf));
+      this.clientId = conf.getString(FlinkOptions.WRITE_CLIENT_ID);
+      return this;
+    }
+
+    public Builder heartbeatIntervalInMs(long interval) {
+      this.heartbeatIntervalInMs = interval;
+      return this;
+    }
+
+    public Builder numTolerableHeartbeatMisses(int numMisses) {
+      this.heartbeatIntervalInMs = numMisses;
+      return this;
+    }
+
+    public ClientIds build() {
+      return new ClientIds(Objects.requireNonNull(this.fs), 
Objects.requireNonNull(this.basePath),
+          this.clientId, this.heartbeatIntervalInMs, 
this.numTolerableHeartbeatMisses);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
index 0f3de9127b7..2a1f523fdb0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java
@@ -47,8 +47,6 @@ public class ViewStorageProperties {
 
   private static final String FILE_NAME = "view_storage_conf";
 
-  private static final String FILE_SUFFIX = ".properties";
-
   /**
    * Initialize the {@link #FILE_NAME} meta file.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsInference.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsInference.java
new file mode 100644
index 00000000000..67777b016a7
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsInference.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.configuration;
+
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.util.ClientIds;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test cases for {@link OptionsInference}.
+ */
+public class TestOptionsInference {
+  @TempDir
+  File tempFile;
+
+  @Test
+  void testSetupClientId() throws Exception {
+    Configuration conf = getConf();
+    conf.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    OptionsInference.setupClientId(conf);
+    assertThat("Explicit client id has higher priority",
+        conf.getString(FlinkOptions.WRITE_CLIENT_ID), is("2"));
+
+    for (int i = 0; i < 3; i++) {
+      conf = getConf();
+      try (ClientIds clientIds = ClientIds.builder().conf(conf).build()) {
+        OptionsInference.setupClientId(conf);
+        String expectedId = i == 0 ? ClientIds.INIT_CLIENT_ID : i + "";
+        assertThat("The client id should auto inc to " + expectedId,
+            conf.getString(FlinkOptions.WRITE_CLIENT_ID), is(expectedId));
+      }
+    }
+
+    // sleep 1 second to simulate a zombie heartbeat
+    Thread.sleep(1000);
+    conf = getConf();
+    try (ClientIds clientIds = ClientIds.builder()
+        .conf(conf)
+        .heartbeatIntervalInMs(10) // max 10 milliseconds tolerable heartbeat 
timeout
+        .numTolerableHeartbeatMisses(1). build()) {
+      String nextId = clientIds.nextId(conf);
+      assertThat("The inactive client id should be reused",
+          nextId, is(""));
+    }
+  }
+
+  private Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value());
+    conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
+    return conf;
+  }
+}

Reply via email to