[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-10 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r686474524



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * Stores the state of a marker directory.
+ *
+ * The operations inside this class is designed to be thread-safe.
+ */
+public class MarkerDirState implements Serializable {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Marker directory
+  private final String markerDirPath;
+  private final FileSystem fileSystem;
+  private final Registry metricsRegistry;
+  // A cached copy of all markers in memory
+  private final Set allMarkers = new HashSet<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder
+  // for efficient appending
+  // Mapping: {markerFileIndex -> markers}
+  private final Map fileMarkersMap = new HashMap<>();
+  // A list of use status of underlying files storing markers by a thread.
+  // {@code true} means the file is in use by a {@code 
BatchCreateMarkerRunnable}.
+  // Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
+  private final List threadUseStatus;
+  // A list of pending futures from async marker creation requests
+  private final List markerCreationFutures = 
new ArrayList<>();
+  private final int parallelism;
+  private final Object lazyInitLock = new Object();
+  private final Object markerCreationProcessingLock = new Object();
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Last underlying file index used, for finding the next file index
+  // in a round-robin fashion
+  private int lastFileIndexUsed = 0;
+  private boolean lazyInitComplete;
+
+  public MarkerDirState(String markerDirPath, int markerBatchNumThreads, 
FileSystem fileSystem,
+Registry metricsRegistry, HoodieEngineContext 
hoodieEngineContext, int parallelism) {
+this.markerDirPath = markerDirPath;
+this.fileSystem = fileSystem;
+this.metricsRegistry = metricsRegistry;
+this.hoodieEngineContext = hoodieEngineContext;
+this.parallelism = parallelism;
+this.threadUseStatus =
+Stream.generate(() -> 
false).limit(markerBatchNumThreads).collect(Collectors.toList());
+this.lazyInitComplete = false;
+  }
+
+  /**
+   * @return  {@code true} if the marker directory exists in the system.
+   */
+  public boolean 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-10 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r686473475



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * Stores the state of a marker directory.
+ *
+ * The operations inside this class is designed to be thread-safe.
+ */
+public class MarkerDirState implements Serializable {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Marker directory
+  private final String markerDirPath;
+  private final FileSystem fileSystem;
+  private final Registry metricsRegistry;
+  // A cached copy of all markers in memory
+  private final Set allMarkers = new HashSet<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder
+  // for efficient appending
+  // Mapping: {markerFileIndex -> markers}
+  private final Map fileMarkersMap = new HashMap<>();
+  // A list of use status of underlying files storing markers by a thread.
+  // {@code true} means the file is in use by a {@code 
BatchCreateMarkerRunnable}.
+  // Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
+  private final List threadUseStatus;
+  // A list of pending futures from async marker creation requests
+  private final List markerCreationFutures = 
new ArrayList<>();
+  private final int parallelism;
+  private final Object lazyInitLock = new Object();
+  private final Object markerCreationProcessingLock = new Object();
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Last underlying file index used, for finding the next file index
+  // in a round-robin fashion
+  private int lastFileIndexUsed = 0;
+  private boolean lazyInitComplete;
+
+  public MarkerDirState(String markerDirPath, int markerBatchNumThreads, 
FileSystem fileSystem,
+Registry metricsRegistry, HoodieEngineContext 
hoodieEngineContext, int parallelism) {
+this.markerDirPath = markerDirPath;
+this.fileSystem = fileSystem;
+this.metricsRegistry = metricsRegistry;
+this.hoodieEngineContext = hoodieEngineContext;
+this.parallelism = parallelism;
+this.threadUseStatus =
+Stream.generate(() -> 
false).limit(markerBatchNumThreads).collect(Collectors.toList());
+this.lazyInitComplete = false;
+  }
+
+  /**
+   * @return  {@code true} if the marker directory exists in the system.
+   */
+  public boolean 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-10 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r686471378



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirRequestContext.java
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import java.util.List;
+
+/**
+ * Input of batch processing of marker creation requests for a single marker 
directory.
+ */
+public class MarkerDirRequestContext {

Review comment:
   Renamed to `BatchedMarkerCreationContext` to reflect batched marker 
creation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-10 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r686407495



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.timeline.service.TimelineService;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationCompletableFuture;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
+import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
+
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+
+  private final Registry metricsRegistry;
+  // a scheduled executor service to schedule dispatching of marker creation 
requests
+  private final ScheduledExecutorService dispatchingExecutorService;
+  // an executor service to schedule the worker threads of batch processing 
marker creation requests
+  private final ExecutorService batchingExecutorService;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  // Marker directory states, {markerDirPath -> MarkerDirState instance}
+  private final Map markerDirStateMap = new 
HashMap<>();
+  // A thread to dispatch marker creation requests to batch processing threads
+  private final MarkerCreationDispatchingRunnable 
markerCreationDispatchingRunnable;
+  private final AtomicBoolean firstCreationRequestSeen;
+  private transient HoodieEngineContext hoodieEngineContext;
+  private ScheduledFuture dispatchingScheduledFuture;

Review comment:
   renamed to `dispatchingThreadFuture`.

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595110



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();

Review comment:
   The multiple concurrent writers haven't been tested against the 
timeline-server-based markers, though it is designed to consider such case.  I 
created a TODO to verify this later on: 
https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684594558



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -423,7 +424,8 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
   protected void postCommit(HoodieTable table, 
HoodieCommitMetadata metadata, String instantTime, Option> 
extraMetadata) {
 try {
   // Delete the marker directory for the instant.
-  new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
+  MarkerFilesFactory.get(config.getMarkersIOMode(), table, instantTime)

Review comment:
   This is a TODO in the follow-up PR: 
https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595766



##
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##
@@ -601,4 +604,84 @@ public static HoodieWrapperFileSystem getFs(String path, 
SerializableConfigurati
 .filter(fileStatus -> 
!fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
 .collect(Collectors.toList());
   }
+
+  /**
+   * Deletes a directory by deleting sub-paths in parallel on the file system.
+   *
+   * @param hoodieEngineContext {@code HoodieEngineContext} instance
+   * @param fs file system
+   * @param dirPath directory path
+   * @param parallelism parallelism to use for sub-paths
+   * @return {@code true} if the directory is delete; {@code false} otherwise.
+   */
+  public static boolean deleteDir(
+  HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, 
int parallelism) {
+try {
+  if (fs.exists(dirPath)) {
+FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, 
parallelism, e -> true,
+pairOfSubPathAndConf -> 
deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue())
+);
+
+boolean result = fs.delete(dirPath, true);
+LOG.info("Removed directory at " + dirPath);
+return result;
+  }
+} catch (IOException ioe) {
+  throw new HoodieIOException(ioe.getMessage(), ioe);
+}
+return false;
+  }
+
+  /**
+   * Processes sub-path in parallel.
+   *
+   * @param hoodieEngineContext {@code HoodieEngineContext} instance
+   * @param fs file system
+   * @param dirPath directory path
+   * @param parallelism parallelism to use for sub-paths
+   * @param subPathPredicate predicate to use to filter sub-paths for 
processing
+   * @param pairFunction actual processing logic for each sub-path
+   * @param  type of result to return for each sub-path
+   * @return a map of sub-path to result of the processing
+   */
+  public static  Map parallelizeSubPathProcess(

Review comment:
   This one is removed from the PR due to `Task not serializable`.  I'll 
add the unit tests once I fix the issue and check it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595662



##
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##
@@ -601,4 +604,84 @@ public static HoodieWrapperFileSystem getFs(String path, 
SerializableConfigurati
 .filter(fileStatus -> 
!fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
 .collect(Collectors.toList());
   }
+
+  /**
+   * Deletes a directory by deleting sub-paths in parallel on the file system.
+   *
+   * @param hoodieEngineContext {@code HoodieEngineContext} instance
+   * @param fs file system
+   * @param dirPath directory path
+   * @param parallelism parallelism to use for sub-paths
+   * @return {@code true} if the directory is delete; {@code false} otherwise.
+   */
+  public static boolean deleteDir(

Review comment:
   Yes, sg.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595608



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595516



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595496



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595179



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684595110



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();

Review comment:
   The multiple concurrent writers haven't been tested against the 
timeline-server-based markers, though it is designed to consider such case.  I 
created a TODO to verify this later on: 
https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271?filter=allissues




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684594558



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -423,7 +424,8 @@ protected void preWrite(String instantTime, 
WriteOperationType writeOperationTyp
   protected void postCommit(HoodieTable table, 
HoodieCommitMetadata metadata, String instantTime, Option> 
extraMetadata) {
 try {
   // Delete the marker directory for the instant.
-  new MarkerFiles(table, instantTime).quietDeleteMarkerDir(context, 
config.getMarkersDeleteParallelism());
+  MarkerFilesFactory.get(config.getMarkersIOMode(), table, instantTime)

Review comment:
   This is a TODO in the follow-up PR: 
https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271?filter=allissues




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-07 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r684594464



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List markerFilesUseStatus;

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-04 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r682352714



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.timeline.service.TimelineService;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationCompletableFuture;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
+import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
+
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+
+  private final Registry metricsRegistry;
+  // a scheduled executor service to schedule dispatching of marker creation 
requests
+  private final ScheduledExecutorService dispatchingExecutorService;
+  // an executor service to schedule the worker threads of batch processing 
marker creation requests
+  private final ExecutorService batchingExecutorService;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  // Marker directory states, {markerDirPath -> MarkerDirState instance}
+  private final Map markerDirStateMap = new 
HashMap<>();
+  // A long-running thread to dispatch marker creation requests to batch 
processing threads
+  private final MarkerCreationDispatchingRunnable 
markerCreationDispatchingRunnable;
+  private final AtomicBoolean firstCreationRequestSeen;
+  private transient HoodieEngineContext hoodieEngineContext;
+  private ScheduledFuture dispatchingScheduledFuture;
+
+  public MarkerHandler(Configuration conf, TimelineService.Config 
timelineServiceConfig,
+   HoodieEngineContext hoodieEngineContext, FileSystem 
fileSystem,
+   FileSystemViewManager viewManager, Registry 
metricsRegistry) throws IOException {
+super(conf, timelineServiceConfig, fileSystem, viewManager);
+LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+LOG.debug("MarkerHandler batching params: batchNumThreads=" + 
timelineServiceConfig.markerBatchNumThreads
++ " batchIntervalMs=" + timelineServiceConfig.markerBatchIntervalMs + 
"ms");
+

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-04 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r682349212



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * Stores the state of a marker directory.
+ *
+ * The operations inside this class is designed to be thread-safe.
+ */
+public class MarkerDirState implements Serializable {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  // Marker directory
+  private final String markerDirPath;
+  private final FileSystem fileSystem;
+  private final Registry metricsRegistry;
+  // A cached copy of all markers in memory
+  private final Set allMarkers = new HashSet<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder
+  // for efficient appending
+  // Mapping: {markerFileIndex -> markers}
+  private final Map fileMarkersMap = new HashMap<>();
+  // A list of use status of underlying files storing markers by a thread.
+  // {@code true} means the file is in use by a {@code 
BatchCreateMarkerRunnable}.
+  // Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
+  private final List threadUseStatus;
+  // A list of pending futures from async marker creation requests
+  private final List markerCreationFutures = 
new ArrayList<>();
+  private final int parallelism;
+  private final Object lazyInitLock = new Object();
+  private final Object markerCreationProcessingLock = new Object();
+  private transient HoodieEngineContext hoodieEngineContext;
+  // Last underlying file index used, for finding the next file index
+  // in a round-robin fashion
+  private int lastFileIndex = 0;

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-04 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r682348886



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.timeline.service.TimelineService;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationCompletableFuture;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
+import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
+
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+
+  private final Registry metricsRegistry;
+  // a scheduled executor service to schedule dispatching of marker creation 
requests
+  private final ScheduledExecutorService dispatchingExecutorService;
+  // an executor service to schedule the worker threads of batch processing 
marker creation requests
+  private final ExecutorService batchingExecutorService;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  // Marker directory states, {markerDirPath -> MarkerDirState instance}
+  private final Map markerDirStateMap = new 
HashMap<>();
+  // A long-running thread to dispatch marker creation requests to batch 
processing threads
+  private final MarkerCreationDispatchingRunnable 
markerCreationDispatchingRunnable;
+  private final AtomicBoolean firstCreationRequestSeen;
+  private transient HoodieEngineContext hoodieEngineContext;
+  private ScheduledFuture dispatchingScheduledFuture;
+
+  public MarkerHandler(Configuration conf, TimelineService.Config 
timelineServiceConfig,
+   HoodieEngineContext hoodieEngineContext, FileSystem 
fileSystem,
+   FileSystemViewManager viewManager, Registry 
metricsRegistry) throws IOException {
+super(conf, timelineServiceConfig, fileSystem, viewManager);
+LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+LOG.debug("MarkerHandler batching params: batchNumThreads=" + 
timelineServiceConfig.markerBatchNumThreads
++ " batchIntervalMs=" + timelineServiceConfig.markerBatchIntervalMs + 
"ms");
+

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-04 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r682348725



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.timeline.service.TimelineService;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationCompletableFuture;
+import 
org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
+import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
+
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+
+  private final Registry metricsRegistry;
+  // a scheduled executor service to schedule dispatching of marker creation 
requests
+  private final ScheduledExecutorService dispatchingExecutorService;
+  // an executor service to schedule the worker threads of batch processing 
marker creation requests
+  private final ExecutorService batchingExecutorService;
+  // Parallelism for reading and deleting marker files
+  private final int parallelism;
+  // Marker directory states, {markerDirPath -> MarkerDirState instance}
+  private final Map markerDirStateMap = new 
HashMap<>();
+  // A long-running thread to dispatch marker creation requests to batch 
processing threads

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-03 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r681559342



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##
@@ -304,7 +304,7 @@ protected void postCommit(HoodieTable>, List,
 Option> extraMetadata) {
 try {
   // Delete the marker directory for the instant.
-  new MarkerFiles(createTable(config, hadoopConf), instantTime)
+  MarkerFilesFactory.get(config.getMarkersType(), createTable(config, 
hadoopConf), instantTime)

Review comment:
   I haven't tested that.  Created a ticket here to track this: 
https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##
@@ -175,70 +165,30 @@ public static String stripMarkerSuffix(String path) {
 return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-int begin = fullMarkerPath.indexOf(markerRootPath);
-ValidationUtils.checkArgument(begin >= 0,
-"Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
/.hoodie/.temp//2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  @Override
+  protected Option create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+HoodieTimer timer = new HoodieTimer().startTimer();
 Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+Path dirPath = markerPath.getParent();
 try {
-  LOG.info("Creating Marker Path=" + markerPath);
-  fs.create(markerPath, false).close();
+  if (!fs.exists(dirPath)) {

Review comment:
   I keep the actual logic of marker creation in the existing marker file 
mechanism the same.  In the old class, each marker creation (`create()`) calls 
`getMarkerPath()`, which invokes `fs.exists(path)`.  In the new class, I 
changed the `getMarkerPath()` method to only construct the path instead of 
doing any file system I/O operations, so that `getMarkerPath()` can be also 
used in the timeline-server-based marker file class.  In such a way, to keep 
the logic the same for `DirectMarkerFiles`, the `fs.exists(dirPath)` is added 
here.
   
   We can optimize the flow in a follow-up PR.

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680737342



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestMarkerFiles.java
##
@@ -25,49 +25,27 @@
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestMarkerFiles extends HoodieCommonTestHarness {
-
-  private MarkerFiles markerFiles;
-  private FileSystem fs;
-  private Path markerFolderPath;
-  private JavaSparkContext jsc;
-  private HoodieSparkEngineContext context;
-
-  @BeforeEach
-  public void setup() throws IOException {
-initPath();
-initMetaClient();
-this.jsc = new 
JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName()));
-this.context = new HoodieSparkEngineContext(jsc);
-this.fs = FSUtils.getFs(metaClient.getBasePath(), 
metaClient.getHadoopConf());
-this.markerFolderPath =  new Path(metaClient.getMarkerFolderPath("000"));
-this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), 
markerFolderPath.toString(), "000");
-  }
+public abstract class TestMarkerFiles extends HoodieCommonTestHarness {

Review comment:
   Fixed.

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java
##
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
+
+/**
+ * Stores the state of a marker directory.
+ *
+ * The operations inside this class is designed to be thread-safe.
+ */
+public class MarkerDirState implements Serializable {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
+  // Marker directory
+  private final String markerDirPath;
+  private final FileSystem fileSystem;
+  // A cached copy of all markers in memory
+  private final Set allMarkers = new HashSet<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder
+  // for efficient appending
+  // Mapping: {markerFileIndex -> markers}
+  private final Map fileMarkersMap = new HashMap<>();
+  // 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680716767



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationDispatchingRunnable.java
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.util.HoodieTimer;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * A runnable for scheduling batch processing of marker creation requests.
+ */
+public class MarkerCreationDispatchingRunnable implements Runnable {
+  public static final Logger LOG = 
LogManager.getLogger(MarkerCreationDispatchingRunnable.class);
+
+  // Marker directory states, {markerDirPath -> MarkerDirState instance}
+  private final Map markerDirStateMap;
+  private final Registry metricsRegistry;
+  private final ExecutorService executorService;
+  // Batch process interval in milliseconds
+  private final long batchIntervalMs;
+  private boolean isRunning = false;
+
+  public MarkerCreationDispatchingRunnable(
+  Map markerDirStateMap, Registry metricsRegistry,
+  int batchNumThreads, long batchIntervalMs) {
+this.markerDirStateMap = markerDirStateMap;
+this.metricsRegistry = metricsRegistry;
+this.batchIntervalMs = batchIntervalMs;
+this.executorService = Executors.newFixedThreadPool(batchNumThreads);
+this.isRunning = true;
+  }
+
+  public void stop() {
+this.isRunning = false;
+  }
+
+  @Override
+  public void run() {
+while (isRunning) {
+  HoodieTimer timer = new HoodieTimer().startTimer();
+  Map> futureMap =
+  markerDirStateMap.entrySet().stream().collect(
+  Collectors.toMap(Map.Entry::getKey,
+  e -> e.getValue().fetchPendingMarkerCreationRequests()));
+  executorService.execute(
+  new MarkerCreationBatchingRunnable(markerDirStateMap, 
metricsRegistry, futureMap));
+
+  try {
+Thread.sleep(Math.max(batchIntervalMs - timer.endTimer(), 0L));

Review comment:
   Per offline discussion, I changed the scheduling logic into two parts: 
(1) dispatching of requests in one scheduled executor service which pulls 
requests and starts a worker thread periodically, and (2) the actual processing 
of requests in the worker runnable, where all the marker creation processing 
and file updates happen.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680701808



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680701935



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680701459



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680698723



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680698426



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680694481



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-08-02 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680693810



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-30 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680311786



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-30 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680311690



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-30 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680311587



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-30 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680311535



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();

Review comment:
   Yes, I added a new class, `MarkerDirState`, to store the states of a 
single marker directory and operate on the markers.

##
File path: 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-30 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r680310395



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;

Review comment:
   Based on the discussion, we don't use this config anymore.

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r677119308



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r677110738



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -228,6 +229,30 @@
   + "files from lake storage, before committing the write. Reduce this 
value, if the high number of tasks incur delays for smaller tables "
   + "or low latency writes.");
 
+  public static final ConfigProperty MARKERS_TYPE = ConfigProperty

Review comment:
   Fixed.

##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();

Review comment:
   Yes, 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r677110192



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
##
@@ -52,36 +53,50 @@
 
   private int serverPort;
   private Configuration conf;
+  private transient HoodieEngineContext context;
   private transient FileSystem fs;
   private transient Javalin app = null;
   private transient FileSystemViewManager fsViewsManager;
   private final int numThreads;
   private final boolean shouldCompressOutput;
   private final boolean useAsync;
+  private final int markerBatchNumThreads;

Review comment:
   These have been included in the TimelineService.Config and we use the 
config instance to pass all config values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r677107014



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List markerFilesUseStatus;

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676777804



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List markerFilesUseStatus;

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676777481



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+/**
+ * REST Handler servicing marker requests.
+ *
+ * The marker creation requests are handled asynchronous, while other types of 
requests
+ * are handled synchronous.
+ *
+ * Marker creation requests are batch processed periodically by a thread.  
Each batch
+ * processing thread adds new markers to a marker file.  Given that marker 
file operation
+ * can take time, multiple concurrent threads can run at the same, while they 
operate
+ * on different marker files storing mutually exclusive marker entries.  At 
any given
+ * time, a marker file is touched by at most one thread to guarantee 
consistency.
+ * Below is an example of running batch processing threads.
+ *
+ *   |-| batch interval
+ * Thread 1  |-->| writing to MARKERS1
+ * Thread 2|-->| writing to MARKERS2
+ * Thread 3   |-->| writing to MARKERS3
+ */
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  // Margin time for scheduling the processing of the next batch of marker 
creation requests
+  private static final long SCHEDULING_MARGIN_TIME_MS = 5L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // A cached copy of all markers in memory
+  // Mapping: {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // A cached copy of marker entries in each marker file, stored in 
StringBuilder for efficient appending
+  // Mapping: {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  // A list of pending futures from async marker creation requests
+  private final List createMarkerFutures = new 
ArrayList<>();
+  // A list of use status of marker files. {@code true} means the file is in 
use by a {@code BatchCreateMarkerRunnable}
+  private final List markerFilesUseStatus;

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676771224



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map> fileMarkersMap = new 
HashMap<>();
+  private final List createMarkerFutures = new 
ArrayList<>();
+  private final List isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+   int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+super(conf, fileSystem, viewManager);
+LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+this.metricsRegistry = metricsRegistry;
+this.batchIntervalMs = batchIntervalMs;
+this.parallelism = parallelism;
+this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+List isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+for (int i = 0; i < batchNumThreads; i++) {
+  isMarkerFileInUseList.add(false);
+}
+this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set getAllMarkers(String markerDirPath) {
+return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set getCreateAndMergeMarkers(String markerDirPath) {
+return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+.filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+.collect(Collectors.toSet());
+  }
+
+  public CompletableFuture createMarker(Context context, String 
markerDirPath, String markerName) {
+LOG.info("Request: 

[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676761459



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectMarkerFiles.java
##
@@ -175,70 +164,30 @@ public static String stripMarkerSuffix(String path) {
 return markerFiles;
   }
 
-  private String stripMarkerFolderPrefix(String fullMarkerPath) {
-
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
-String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
-new Path(String.format("%s/%s/%s", basePath, 
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
-int begin = fullMarkerPath.indexOf(markerRootPath);
-ValidationUtils.checkArgument(begin >= 0,
-"Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected 
Marker Root=" + markerRootPath);
-return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
-  }
-
-  /**
-   * The marker path will be 
/.hoodie/.temp//2019/04/25/filename.marker.writeIOType.
-   */
-  public Path create(String partitionPath, String dataFileName, IOType type) {
+  @Override
+  protected Option create(String partitionPath, String dataFileName, 
IOType type, boolean checkIfExists) {
+LOG.info("[direct] Create marker file : " + partitionPath + " " + 
dataFileName);
+long startTimeMs = System.currentTimeMillis();
 Path markerPath = getMarkerPath(partitionPath, dataFileName, type);
+Path dirPath = markerPath.getParent();
 try {
-  LOG.info("Creating Marker Path=" + markerPath);
-  fs.create(markerPath, false).close();
+  if (!fs.exists(dirPath)) {

Review comment:
   Actually, I refactored `getMarkerPath()` and moved the directory path 
check to the `create()`.  So the actual logic is the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676752617



##
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
##
@@ -392,6 +413,44 @@ private void registerFileSlicesAPI() {
 }, true));
   }
 
+  private void registerMarkerAPI() {

Review comment:
   Sg.  I moved marker request logic from TableFileSystemView to 
TimelineServerBasedMarkerFiles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] yihua commented on a change in pull request #3233: [HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency

2021-07-26 Thread GitBox


yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r676209554



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -222,6 +224,30 @@
   + "files from lake storage, before committing the write. Reduce this 
value, if the high number of tasks incur delays for smaller tables "
   + "or low latency writes.");
 
+  public static final ConfigProperty MARKERS_IO_MODE = ConfigProperty
+  .key("hoodie.markers.io.mode")
+  .defaultValue(MarkerIOMode.TIMELINE_BASED.toString())
+  .sinceVersion(VERSION_0_9_0)

Review comment:
   I hardcoded the version.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/MarkerFiles.java
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Operates on marker files for a given write action (commit, delta commit, 
compaction).
+ *
+ * This abstract class provides abstract methods of different marker file 
operations, so that
+ * different marker file mechanism can be implemented.
+ */
+public abstract class MarkerFiles implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
+
+  protected final String basePath;
+  protected final transient Path markerDirPath;
+  protected final String instantTime;
+
+  public MarkerFiles(String basePath, String markerFolderPath, String 
instantTime) {
+this.basePath = basePath;
+this.markerDirPath = new Path(markerFolderPath);
+this.instantTime = instantTime;
+  }
+
+  /**
+   * The marker path will be 
/.hoodie/.temp//2019/04/25/filename.marker.writeIOType.

Review comment:
   This is the old javadocs.  I changed it according to the changes.

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineBasedMarkerFiles.java
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Marker file operations of using timeline service as a proxy to create and 
delete marker files.
+ * Each data file has a corresponding marker entry, which is stored in a 
limited number of
+ * marker files maintained by the