[ 
https://issues.apache.org/jira/browse/HUDI-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17383830#comment-17383830
 ] 

ASF GitHub Bot commented on HUDI-1138:
--------------------------------------

yihua commented on a change in pull request #3233:
URL: https://github.com/apache/hudi/pull/3233#discussion_r672869592



##########
File path: 
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
+
+public class MarkerHandler extends Handler {
+  public static final String MARKERS_FILENAME = "MARKERS";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
+  private static final long MARGIN_TIME_MS = 10L;
+
+  private final Registry metricsRegistry;
+  private final ScheduledExecutorService executorService;
+  // {markerDirPath -> all markers}
+  private final Map<String, Set<String>> allMarkersMap = new HashMap<>();
+  // {markerDirPath -> {markerFileIndex -> markers}}
+  private final Map<String, Map<Integer, StringBuilder>> fileMarkersMap = new 
HashMap<>();
+  private final List<CreateMarkerCompletableFuture> createMarkerFutures = new 
ArrayList<>();
+  private final List<Boolean> isMarkerFileInUseList;
+  private final long batchIntervalMs;
+  private final int parallelism;
+  private volatile Object createMarkerRequestlockObject = new Object();
+  private long nextBatchProcessTimeMs = 0L;
+
+  public MarkerHandler(Configuration conf, FileSystem fileSystem, 
FileSystemViewManager viewManager, Registry metricsRegistry,
+                       int batchNumThreads, long batchIntervalMs, int 
parallelism) throws IOException {
+    super(conf, fileSystem, viewManager);
+    LOG.info("*** MarkerHandler FileSystem: " + this.fileSystem.getScheme());
+    LOG.info("*** MarkerHandler Params: batchNumThreads=" + batchNumThreads + 
" batchIntervalMs=" + batchIntervalMs + "ms");
+    this.metricsRegistry = metricsRegistry;
+    this.batchIntervalMs = batchIntervalMs;
+    this.parallelism = parallelism;
+    this.executorService = Executors.newScheduledThreadPool(batchNumThreads);
+    List<Boolean> isMarkerFileInUseList = new ArrayList<>(batchNumThreads);
+    for (int i = 0; i < batchNumThreads; i++) {
+      isMarkerFileInUseList.add(false);
+    }
+    this.isMarkerFileInUseList = 
Collections.synchronizedList(isMarkerFileInUseList);
+  }
+
+  public Set<String> getAllMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>());
+  }
+
+  public Set<String> getCreateAndMergeMarkers(String markerDirPath) {
+    return allMarkersMap.getOrDefault(markerDirPath, new HashSet<>()).stream()
+        .filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
+        .collect(Collectors.toSet());
+  }
+
+  public CompletableFuture<String> createMarker(Context context, String 
markerDirPath, String markerName) {
+    LOG.info("Request: create marker " + markerDirPath + " " + markerName);
+    CreateMarkerCompletableFuture future = new 
CreateMarkerCompletableFuture(context, markerDirPath, markerName);
+    synchronized (createMarkerRequestlockObject) {
+      createMarkerFutures.add(future);
+      long currTimeMs = System.currentTimeMillis();
+      if (currTimeMs >= nextBatchProcessTimeMs - MARGIN_TIME_MS) {
+        nextBatchProcessTimeMs += batchIntervalMs * (Math.max((currTimeMs - 
nextBatchProcessTimeMs) / batchIntervalMs + 1L, 1L));
+
+        long waitMs = nextBatchProcessTimeMs - System.currentTimeMillis();
+        executorService.schedule(
+            new BatchCreateMarkerRunnable(), Math.max(0L, waitMs), 
TimeUnit.MILLISECONDS);
+        LOG.info("Wait for " + waitMs + " ms, next batch time: " + 
nextBatchProcessTimeMs);
+      }
+    }
+    return future;
+  }
+
+  public Boolean deleteMarkers(String markerDir) {
+    Path markerDirPath = new Path(markerDir);
+    try {
+      if (fileSystem.exists(markerDirPath)) {
+        FileStatus[] fileStatuses = fileSystem.listStatus(markerDirPath);
+        List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+            .map(fileStatus -> fileStatus.getPath().toString())
+            .collect(Collectors.toList());
+
+        if (markerDirSubPaths.size() > 0) {
+          for (String subPathStr: markerDirSubPaths) {
+            fileSystem.delete(new Path(subPathStr), true);
+          }
+        }
+
+        boolean result = fileSystem.delete(markerDirPath, true);
+        LOG.info("Removing marker directory at " + markerDirPath);
+        return result;
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+    return false;
+  }
+
+  private Set<String> getAllMarkersFromFile(String markerDirPath) {
+    LOG.info("Get all markers from " + markerDirPath);
+    Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME);
+    Set<String> markers = new HashSet<>();
+    try {
+      if (fileSystem.exists(markersFilePath)) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Re-implement marker files via timeline server
> ---------------------------------------------
>
>                 Key: HUDI-1138
>                 URL: https://issues.apache.org/jira/browse/HUDI-1138
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Writer Core
>    Affects Versions: 0.9.0
>            Reporter: Vinoth Chandar
>            Assignee: Ethan Guo
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Even as you can argue that RFC-15/consolidated metadata, removes the need for 
> deleting partial files written due to spark task failures/stage retries. It 
> will still leave extra files inside the table (and users will pay for it 
> every month) and we need the marker mechanism to be able to delete these 
> partial files. 
> Here we explore if we can improve the current marker file mechanism, that 
> creates one marker file per data file written, by 
> Delegating the createMarker() call to the driver/timeline server, and have it 
> create marker metadata into a single file handle, that is flushed for 
> durability guarantees
>  
> P.S: I was tempted to think Spark listener mechanism can help us deal with 
> failed tasks, but it has no guarantees. the writer job could die without 
> deleting a partial file. i.e it can improve things, but cant provide 
> guarantees 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to