[ 
https://issues.apache.org/jira/browse/BEAM-5309?focusedWorklogId=165330&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165330
 ]

ASF GitHub Bot logged work on BEAM-5309:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Nov/18 09:31
            Start Date: 13/Nov/18 09:31
    Worklog Time Spent: 10m 
      Work Description: b923 commented on a change in pull request #6691: 
WIP:[BEAM-5309] Add streaming support for HadoopFormatIO
URL: https://github.com/apache/beam/pull/6691#discussion_r232957141
 
 

 ##########
 File path: 
sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
 ##########
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.format;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ExternalSynchronization} which registers locks in 
the HDFS. Requires
+ *
+ * <p>{@code locksDir} to be specified. This directory MUST be different that 
directory which is
+ * possibly stored under {@code "mapreduce.output.fileoutputformat.outputdir"} 
key. Otherwise setup
+ * of job will fail because the directory will exist before job setup.
+ */
+public class HDFSSynchronization implements ExternalSynchronization {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HDFSSynchronization.class);
+
+  private static final String LOCKS_DIR_PATTERN = "%s/";
+  private static final String LOCKS_DIR_TASK_PATTERN = LOCKS_DIR_PATTERN + 
"%s";
+  private static final String LOCKS_DIR_TASK_ATTEMPT_PATTERN = 
LOCKS_DIR_TASK_PATTERN + "_%s";
+  private static final String LOCKS_DIR_JOB_FILENAME = LOCKS_DIR_PATTERN + 
"_job";
+
+  private static final transient Random RANDOM_GEN = new Random();
+
+  private final String locksDir;
+  private ThrowingFunction<Configuration, FileSystem, IOException> 
fileSystemFactory;
+
+  /**
+   * Creates instance of {@link HDFSSynchronization}.
+   *
+   * @param locksDir directory where locks will be stored. This directory MUST 
be different that
+   *     directory which is possibly stored under {@code
+   *     "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup 
of job will fail
+   *     because the directory will exist before job setup.
+   */
+  public HDFSSynchronization(String locksDir) {
+    this(locksDir, FileSystem::get);
+  }
+
+  /**
+   * Creates instance of {@link HDFSSynchronization}.
+   *
+   * @param locksDir directory where locks will be stored. This directory MUST 
be different that
+   *     directory which is possibly stored under {@code
+   *     "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup 
of job will fail
+   *     because the directory will exist before job setup.
+   * @param fileSystemFactory supplier of the file system
+   */
+  HDFSSynchronization(
+      String locksDir, ThrowingFunction<Configuration, FileSystem, 
IOException> fileSystemFactory) {
+    this.locksDir = locksDir;
+    this.fileSystemFactory = fileSystemFactory;
+  }
+
+  @Override
+  public boolean tryAcquireJobLock(Configuration conf) {
+    Path path = new Path(locksDir, String.format(LOCKS_DIR_JOB_FILENAME, 
getJobJtIdentifier(conf)));
+
+    return tryCreateFile(conf, path);
+  }
+
+  @Override
+  public void releaseJobIdLock(Configuration conf) {
+    Path path = new Path(locksDir, String.format(LOCKS_DIR_PATTERN, 
getJobJtIdentifier(conf)));
+
+    try {
+      if (FileSystem.get(conf).delete(path, true)) {
+        LOGGER.info("Delete of lock directory {} was successful", path);
+      } else {
+        LOGGER.warn("Delete of lock directory {} was unsuccessful", path);
+      }
+
+    } catch (IOException e) {
+      String formattedExceptionMessage =
+          String.format("Delete of lock directory %s was unsuccessful", path);
+      LOGGER.warn(formattedExceptionMessage, e);
+      throw new IllegalStateException(formattedExceptionMessage, e);
+    }
+  }
+
+  @Override
+  public TaskID acquireTaskIdLock(Configuration conf) {
+    JobID jobId = HadoopFormats.getJobId(conf);
+    boolean lockAcquired = false;
+    int taskIdCandidate = 0;
+
+    while (!lockAcquired) {
+      taskIdCandidate = RANDOM_GEN.nextInt(Integer.MAX_VALUE);
+      Path path =
+          new Path(
+              locksDir,
+              String.format(LOCKS_DIR_TASK_PATTERN, getJobJtIdentifier(conf), 
taskIdCandidate));
+      lockAcquired = tryCreateFile(conf, path);
+    }
+
+    return HadoopFormats.createTaskID(jobId, taskIdCandidate);
+  }
+
+  @Override
+  public TaskAttemptID acquireTaskAttemptIdLock(Configuration conf, int 
taskId) {
+    String jobJtIdentifier = getJobJtIdentifier(conf);
+    JobID jobId = HadoopFormats.getJobId(conf);
+    int taskAttemptCandidate = 0;
+    boolean taskAttemptAcquired = false;
+
+    while (!taskAttemptAcquired) {
+      taskAttemptCandidate++;
+      Path path =
+          new Path(
+              locksDir,
+              String.format(
+                  LOCKS_DIR_TASK_ATTEMPT_PATTERN, jobJtIdentifier, taskId, 
taskAttemptCandidate));
+      taskAttemptAcquired = tryCreateFile(conf, path);
+    }
+
+    return HadoopFormats.createTaskAttemptID(jobId, taskId, 
taskAttemptCandidate);
+  }
+
+  private boolean tryCreateFile(Configuration conf, Path path) {
+    try {
+      FileSystem fileSystem = fileSystemFactory.apply(conf);
+
+      try {
 
 Review comment:
   Fixed, thank you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165330)
    Time Spent: 9h 10m  (was: 9h)

> Add streaming support for HadoopOutputFormatIO
> ----------------------------------------------
>
>                 Key: BEAM-5309
>                 URL: https://issues.apache.org/jira/browse/BEAM-5309
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-hadoop
>            Reporter: Alexey Romanenko
>            Assignee: David Hrbacek
>            Priority: Minor
>          Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> design doc: https://s.apache.org/beam-streaming-hofio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to