nsivabalan commented on code in PR #10122:
URL: https://github.com/apache/hudi/pull/10122#discussion_r1399649058


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -29,37 +31,95 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.timeline.service.TimelineService;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Timeline Service that runs as part of write client.
  */
 public class EmbeddedTimelineService {
+  // lock used when starting/stopping/modifying embedded services
+  private static final Object SERVICE_LOCK = new Object();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedTimelineService.class);
-
+  private static final AtomicInteger NUM_SERVERS_RUNNING = new 
AtomicInteger(0);
+  // Map of port to existing timeline service running on that port

Review Comment:
   So, ideally we will have just 1 port and 1 timeline server right. unless 
someone explicitly configures diff port for timeline server when they are 
running multiple jobs against multiple tables. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -29,37 +31,95 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.timeline.service.TimelineService;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Timeline Service that runs as part of write client.
  */
 public class EmbeddedTimelineService {
+  // lock used when starting/stopping/modifying embedded services
+  private static final Object SERVICE_LOCK = new Object();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedTimelineService.class);
-
+  private static final AtomicInteger NUM_SERVERS_RUNNING = new 
AtomicInteger(0);
+  // Map of port to existing timeline service running on that port
+  private static final Map<Integer, EmbeddedTimelineService> RUNNING_SERVICES 
= new HashMap<>();
+  private static final Registry METRICS_REGISTRY = 
Registry.getRegistry("TimelineService");
+  private static final String NUM_EMBEDDED_TIMELINE_SERVERS = 
"numEmbeddedTimelineServers";
   private int serverPort;
   private String hostAddr;
-  private HoodieEngineContext context;
+  private final HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
   private final HoodieWriteConfig writeConfig;
-  private final String basePath;
+  private TimelineService.Config serviceConfig;
+  private final Set<String> basePaths; // the set of base paths using this 
EmbeddedTimelineService
 
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
+  private EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
     this.writeConfig = writeConfig;
-    this.basePath = writeConfig.getBasePath();
+    this.basePaths = new HashSet<>();
+    this.basePaths.add(writeConfig.getBasePath());
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
   }
 
+  /**
+   * Returns an existing embedded timeline service if one is running for the 
given configuration and reuse is enabled, or starts a new one.
+   * @param context The {@link HoodieEngineContext} for the client
+   * @param embeddedTimelineServiceHostAddr The host address to use for the 
service (nullable)
+   * @param writeConfig The {@link HoodieWriteConfig} for the client
+   * @return A running {@link EmbeddedTimelineService}
+   * @throws IOException if an error occurs while starting the service
+   */
+  public static EmbeddedTimelineService 
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws 
IOException {
+    return getOrStartEmbeddedTimelineService(context, 
embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new);
+  }
+
+  static EmbeddedTimelineService 
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
+                                                                   
TimelineServiceCreator timelineServiceCreator) throws IOException {
+    // if reuse is enabled, check if any existing instances are compatible
+    if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
+      synchronized (SERVICE_LOCK) {
+        for (EmbeddedTimelineService service : RUNNING_SERVICES.values()) {
+          if (service.canReuseFor(writeConfig, 
embeddedTimelineServiceHostAddr)) {
+            service.addBasePath(writeConfig.getBasePath());
+            LOG.info("Reusing existing embedded timeline server with 
configuration: " + service.serviceConfig);
+            return service;
+          }
+        }
+        // if no compatible instance is found, create a new one
+        EmbeddedTimelineService service = createAndStartService(context, 
embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator);
+        RUNNING_SERVICES.put(service.serverPort, service);

Review Comment:
   from the impl of canReuseFor, I feel, a triplet of host, port and markerType 
uniquely identified a timeline server. 
   For eg, lets say someone switching marker type for a table from DIRECT to 
TIMELINE SERVER based. 
   I see, we might have two timeline servers spinning up. But the source of 
truth i.e. the RUNNING_SERVICES map even though will have two entries, but the 
key is the same. or might override previous entry.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -29,37 +31,95 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.timeline.service.TimelineService;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Timeline Service that runs as part of write client.
  */
 public class EmbeddedTimelineService {
+  // lock used when starting/stopping/modifying embedded services
+  private static final Object SERVICE_LOCK = new Object();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedTimelineService.class);
-
+  private static final AtomicInteger NUM_SERVERS_RUNNING = new 
AtomicInteger(0);
+  // Map of port to existing timeline service running on that port
+  private static final Map<Integer, EmbeddedTimelineService> RUNNING_SERVICES 
= new HashMap<>();
+  private static final Registry METRICS_REGISTRY = 
Registry.getRegistry("TimelineService");
+  private static final String NUM_EMBEDDED_TIMELINE_SERVERS = 
"numEmbeddedTimelineServers";
   private int serverPort;
   private String hostAddr;
-  private HoodieEngineContext context;
+  private final HoodieEngineContext context;
   private final SerializableConfiguration hadoopConf;
   private final HoodieWriteConfig writeConfig;
-  private final String basePath;
+  private TimelineService.Config serviceConfig;
+  private final Set<String> basePaths; // the set of base paths using this 
EmbeddedTimelineService
 
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
-  public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
+  private EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
     this.writeConfig = writeConfig;
-    this.basePath = writeConfig.getBasePath();
+    this.basePaths = new HashSet<>();
+    this.basePaths.add(writeConfig.getBasePath());
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
   }
 
+  /**
+   * Returns an existing embedded timeline service if one is running for the 
given configuration and reuse is enabled, or starts a new one.
+   * @param context The {@link HoodieEngineContext} for the client
+   * @param embeddedTimelineServiceHostAddr The host address to use for the 
service (nullable)
+   * @param writeConfig The {@link HoodieWriteConfig} for the client
+   * @return A running {@link EmbeddedTimelineService}
+   * @throws IOException if an error occurs while starting the service
+   */
+  public static EmbeddedTimelineService 
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws 
IOException {
+    return getOrStartEmbeddedTimelineService(context, 
embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new);
+  }
+
+  static EmbeddedTimelineService 
getOrStartEmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
+                                                                   
TimelineServiceCreator timelineServiceCreator) throws IOException {
+    // if reuse is enabled, check if any existing instances are compatible
+    if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
+      synchronized (SERVICE_LOCK) {
+        for (EmbeddedTimelineService service : RUNNING_SERVICES.values()) {
+          if (service.canReuseFor(writeConfig, 
embeddedTimelineServiceHostAddr)) {
+            service.addBasePath(writeConfig.getBasePath());
+            LOG.info("Reusing existing embedded timeline server with 
configuration: " + service.serviceConfig);
+            return service;
+          }
+        }
+        // if no compatible instance is found, create a new one
+        EmbeddedTimelineService service = createAndStartService(context, 
embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator);
+        RUNNING_SERVICES.put(service.serverPort, service);

Review Comment:
   oh, I also see metadata config props used. probably better to create an 
object with below entries 
   ```
   host, port, marker type, metadataEnableFlag.
   ```
   So, for every unique combination of these we should have a diff timeline 
server instantiated. 
   
   In straight forward case, when we are running 100 tables w/ exact same 
values for (timeline service host, port, marker type and metadataEnableFlag), 
we should have just one timeline server running serving all 100 tables. 
   
   This aligns the canReuseFor() and the timeline servers we are running and 
tracking using 
   RUNNING_SERVICES
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -146,19 +196,65 @@ public FileSystemViewManager getViewManager() {
     return viewManager;
   }
 
-  public boolean canReuseFor(String basePath) {
-    return this.server != null
-        && this.viewManager != null
-        && this.basePath.equals(basePath);
+  /**
+   * Adds a new base path to the set that are managed by this instance.
+   * @param basePath the new base path to add
+   */
+  private void addBasePath(String basePath) {
+    basePaths.add(basePath);
+  }
+
+  private boolean canReuseFor(HoodieWriteConfig newWriteConfig, String 
newHostAddr) {
+    if (server == null || viewManager == null) {
+      return false; // service is not running
+    }
+    if (basePaths.contains(newWriteConfig.getBasePath())) {
+      return true; // already running for this base path
+    }
+    if (newHostAddr != null && !newHostAddr.equals(this.hostAddr)) {
+      return false; // different host address
+    }
+    if (writeConfig.getMarkersType() != newWriteConfig.getMarkersType()) {
+      return false; // different marker type
+    }
+    return 
metadataConfigsAreEquivalent(writeConfig.getMetadataConfig().getProps(), 
newWriteConfig.getMetadataConfig().getProps());
+  }
+
+  private boolean metadataConfigsAreEquivalent(Properties properties1, 
Properties properties2) {
+    Set<Object> metadataConfigs = new HashSet<>(properties1.keySet());
+    metadataConfigs.addAll(properties2.keySet());
+    return metadataConfigs.stream()
+        .filter(key -> ((String) 
key).startsWith(HoodieMetadataConfig.METADATA_PREFIX))
+        .allMatch(key -> {
+          String value1 = properties1.getProperty((String) key, "");
+          String value2 = properties2.getProperty((String) key, "");
+          return value1.equals(value2);
+        });
+
   }
 
-  public void stop() {
-    if (null != server) {
+  /**
+   * Stops the embedded timeline service for the given base path. If a 
timeline service is managing multiple tables, it will only be shutdown once all 
tables have been stopped.
+   * @param basePath For the table to stop the service for
+   */
+  public void stopForBasePath(String basePath) {

Review Comment:
   minor. the method not only stops for base path. also does some addtional 
clean up. 
   
   so, may be `cleanUpAndMaybeStopService(basePath)`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -88,30 +148,20 @@ public void startServer() throws IOException {
           
.markerBatchIntervalMs(writeConfig.getMarkersTimelineServerBasedBatchIntervalMs())
           .markerParallelism(writeConfig.getMarkersDeleteParallelism());
     }
+    this.serviceConfig = timelineServiceConfBuilder.build();
 
-    if (writeConfig.isEarlyConflictDetectionEnable()) {

Review Comment:
   lets also include these to the key object in RUNNING_SERVICES.
   isEarlyConflictDetectionEnabled. 
   isTimelineServerBasedInstantStateEnabled.
   if not, we might keep re-using the same timeline server expecting certain 
behaviors based on these configs. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -146,19 +196,65 @@ public FileSystemViewManager getViewManager() {
     return viewManager;
   }
 
-  public boolean canReuseFor(String basePath) {
-    return this.server != null
-        && this.viewManager != null
-        && this.basePath.equals(basePath);
+  /**
+   * Adds a new base path to the set that are managed by this instance.
+   * @param basePath the new base path to add
+   */
+  private void addBasePath(String basePath) {
+    basePaths.add(basePath);
+  }
+
+  private boolean canReuseFor(HoodieWriteConfig newWriteConfig, String 
newHostAddr) {
+    if (server == null || viewManager == null) {
+      return false; // service is not running
+    }
+    if (basePaths.contains(newWriteConfig.getBasePath())) {
+      return true; // already running for this base path
+    }
+    if (newHostAddr != null && !newHostAddr.equals(this.hostAddr)) {
+      return false; // different host address
+    }
+    if (writeConfig.getMarkersType() != newWriteConfig.getMarkersType()) {
+      return false; // different marker type
+    }
+    return 
metadataConfigsAreEquivalent(writeConfig.getMetadataConfig().getProps(), 
newWriteConfig.getMetadataConfig().getProps());
+  }
+
+  private boolean metadataConfigsAreEquivalent(Properties properties1, 
Properties properties2) {
+    Set<Object> metadataConfigs = new HashSet<>(properties1.keySet());
+    metadataConfigs.addAll(properties2.keySet());
+    return metadataConfigs.stream()

Review Comment:
   we just need if metadata is enabled or not. mainly used for FileSystemView 
type. 
   we don't need all properties of metadata table. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to