steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1177720020


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          return false;
+        }
+      } else {
+        LOG.debug("Queue inactive; discarding {} entries", entries.size());

Review Comment:
   pretty much never, so yes, make a warn. only likely if there's been a 
failure in one task and the other threads haven't noticed yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to