[ 
https://issues.apache.org/jira/browse/HADOOP-17195?focusedWorklogId=653000&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-653000
 ]

ASF GitHub Bot logged work on HADOOP-17195:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Sep/21 14:30
            Start Date: 20/Sep/21 14:30
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#3446:
URL: https://github.com/apache/hadoop/pull/3446#discussion_r712223092



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
##########
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * A class to provide disk, byteBuffer and byteArray option for Filesystem
+ * OutputStreams.
+ * <ul>
+ *   <li>
+ *     Disk: Uses Disk space to write the blocks. Is suited best to avoid
+ *     OutOfMemory Errors in Java heap space.
+ *   </li>
+ *   <li>
+ *     byteBuffer: Uses DirectByteBuffer to allocate memory off-heap to
+ *     provide faster writing of DataBlocks with some risk of running
+ *     OutOfMemory.
+ *   </li>
+ *   <li>
+ *     byteArray: Uses byte[] to write a block of data. On heap and does have
+ *     risk of running OutOfMemory fairly easily.
+ *   </li>
+ * </ul>
+ * <p>
+ * Implementation of DataBlocks taken from HADOOP-13560 to support huge file
+ * uploads in S3A with different options rather than one.
+ */
+public final class DataBlocks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataBlocks.class);
+
+  /**
+   * Buffer blocks to disk.
+   * Capacity is limited to available disk space.
+   */
+  public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
+
+  /**
+   * Use a byte buffer.
+   */
+  public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer";
+
+  /**
+   * Use an in-memory array. Fast but will run of heap rapidly.
+   */
+  public static final String DATA_BLOCKS_BUFFER_ARRAY = "array";
+
+  private DataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   *
+   * @param b   byte array containing data.
+   * @param off offset in array where to start.
+   * @param len number of bytes to be written.
+   * @throws NullPointerException      for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  public static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   *
+   * @param keyToBufferDir Key to buffer directory config for a FS.
+   * @param configuration  factory configurations.
+   * @param name           factory name -the option from {@link 
CommonConfigurationKeys}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  public static BlockFactory createFactory(String keyToBufferDir,
+      Configuration configuration,
+      String name) {
+    LOG.debug("Creating DataFactory of type : {}", name);
+    switch (name) {
+    case DATA_BLOCKS_BUFFER_ARRAY:
+      return new ArrayBlockFactory(keyToBufferDir, configuration);
+    case DATA_BLOCKS_BUFFER_DISK:
+      return new DiskBlockFactory(keyToBufferDir, configuration);
+    case DATA_BLOCKS_BYTEBUFFER:
+      return new ByteBufferBlockFactory(keyToBufferDir, configuration);
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file, an input stream or a byteArray.
+   * {@link #toByteArray()} method to be used to convert the data into byte
+   * array to be doen in this class as well.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  public static final class BlockUploadData implements Closeable {

Review comment:
       just the way I did it at the time: I wanted to keep everything in one 
place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 653000)
    Time Spent: 5h 40m  (was: 5.5h)

> Intermittent OutOfMemory error while performing hdfs CopyFromLocal to abfs 
> ---------------------------------------------------------------------------
>
>                 Key: HADOOP-17195
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17195
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/azure
>    Affects Versions: 3.3.0
>            Reporter: Mehakmeet Singh
>            Assignee: Mehakmeet Singh
>            Priority: Major
>              Labels: abfsactive, pull-request-available
>          Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> OutOfMemory error due to new ThreadPools being made each time 
> AbfsOutputStream is created. Since threadPool aren't limited a lot of data is 
> loaded in buffer and thus it causes OutOfMemory error.
> Possible fixes:
> - Limit the number of ThreadCounts while performing hdfs copyFromLocal (Using 
> -t property).
> - Reducing OUTPUT_BUFFER_SIZE significantly which would limit the amount of 
> buffer to be loaded in threads.
> - Don't create new ThreadPools each time AbfsOutputStream is created and 
> limit the number of ThreadPools each AbfsOutputStream could create.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to