steveloughran commented on a change in pull request #2014:
URL: https://github.com/apache/hadoop/pull/2014#discussion_r439326950



##########
File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
##########
@@ -207,6 +213,92 @@ public Void call() throws Exception {
     assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
   }
 
+  @Test
+  public void testShouldUseOlderAbfsOutputStreamConf() throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    Path testPath = new Path(methodName.getMethodName() + "1");
+    getFileSystem().getAbfsStore().getAbfsConfiguration()
+        .setShouldUseOlderAbfsOutputStream(true);
+    try (FSDataOutputStream stream = fs.create(testPath)) {
+      Assertions.assertThat(stream.getWrappedStream()).describedAs("When the "
+          + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside "
+          + "the FSDataOutputStream object should be of class "
+          + "AbfsOutputStreamOld.").isInstanceOf(AbfsOutputStreamOld.class);
+    }
+    testPath = new Path(methodName.getMethodName());
+    getFileSystem().getAbfsStore().getAbfsConfiguration()
+        .setShouldUseOlderAbfsOutputStream(false);
+    try (FSDataOutputStream stream = fs.create(testPath)) {
+      Assertions.assertThat(stream.getWrappedStream()).describedAs("When the "
+          + "shouldUseOlderAbfsOutputStream is set the wrapped stream inside "
+          + "the FSDataOutputStream object should be of class "
+          + "AbfsOutputStream.").isInstanceOf(AbfsOutputStream.class);
+    }
+  }
+
+  @Test
+  public void testWriteWithMultipleOutputStreamAtTheSameTime()
+      throws IOException, InterruptedException, ExecutionException {
+    AzureBlobFileSystem fs = getFileSystem();
+    String testFilePath = methodName.getMethodName();
+    Path[] testPaths = new Path[CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT];
+    createNStreamsAndWriteDifferentSizesConcurrently(fs, testFilePath,
+        CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+    assertSuccessfulWritesOnAllStreams(fs,
+        CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+  }
+
+  private void assertSuccessfulWritesOnAllStreams(final FileSystem fs,
+      final int numConcurrentObjects, final Path[] testPaths)
+      throws IOException {
+    for (int i = 0; i < numConcurrentObjects; i++) {
+      FileStatus fileStatus = fs.getFileStatus(testPaths[i]);
+      int numWritesMadeOnStream = i + 1;
+      long expectedLength = TEST_BUFFER_SIZE * numWritesMadeOnStream;
+      assertThat(fileStatus.getLen(), is(equalTo(expectedLength)));
+    }
+  }
+
+  private void createNStreamsAndWriteDifferentSizesConcurrently(
+      final FileSystem fs, final String testFilePath,
+      final int numConcurrentObjects, final Path[] testPaths)
+      throws ExecutionException, InterruptedException {
+    final byte[] b = new byte[TEST_BUFFER_SIZE];
+    new Random().nextBytes(b);
+    final ExecutorService es = Executors.newFixedThreadPool(40);
+    final List<Future<Void>> futureTasks = new ArrayList<>();
+    for (int i = 0; i < numConcurrentObjects; i++) {
+      Path testPath = new Path(testFilePath + i);
+      testPaths[i] = testPath;
+      int numWritesToBeDone = i + 1;
+      futureTasks.add(es.submit(() -> {
+        try (FSDataOutputStream stream = fs.create(testPath)) {
+          makeNWritesToStream(stream, numWritesToBeDone, b, es);
+        }
+        return null;
+      }));
+    }
+    for (Future<Void> futureTask : futureTasks) {
+      futureTask.get();
+    }
+    es.shutdownNow();
+  }
+
+  private void makeNWritesToStream(final FSDataOutputStream stream,
+      final int numWrites, final byte[] b, final ExecutorService es)
+      throws ExecutionException, InterruptedException, IOException {
+    final List<Future<Void>> futureTasks = new ArrayList<>();
+    for (int i = 0; i < numWrites; i++) {
+      futureTasks.add(es.submit(() -> {
+        stream.write(b);
+        return null;
+      }));
+    }
+    for (Future<Void> futureTask : futureTasks) {

Review comment:
       see if you can use org.apache.hadoop.fs.impl.FutureIOSupport here. And 
somewhere there's a method to block waiting for futures to complete without 
doing it sequentally; I believe it is faster

##########
File path: hadoop-tools/hadoop-azure/pom.xml
##########
@@ -172,6 +172,12 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>

Review comment:
       1. can you add to hadoop project pom and then refer here. its how we 
guarantee consistent versions.
   2. do we really need to add a new JAR into production just for annotations? 
if that is all it is for, maybe we could somehow avoid doing that
   
   which annotations is it actually for? as VisibleForTesting is in guava

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java
##########
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+import static java.lang.Math.ceil;
+import static java.lang.Math.min;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+
+/**
+ * Pool for byte[]

Review comment:
       . at ehd for javadoc in java 8

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
##########
@@ -108,23 +106,64 @@ public AbfsOutputStream(
             .isDisableOutputStreamFlush();
     this.lastError = null;
     this.lastFlushOffset = 0;
-    this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
-    this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
-    this.writeOperations = new ConcurrentLinkedDeque<>();
     this.outputStreamStatistics = 
abfsOutputStreamContext.getStreamStatistics();
-
-    this.maxConcurrentRequestCount = 4 * 
Runtime.getRuntime().availableProcessors();
-
-    this.threadExecutor
-        = new ThreadPoolExecutor(maxConcurrentRequestCount,
-        maxConcurrentRequestCount,
-        10L,
-        TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>());
-    this.completionService = new 
ExecutorCompletionService<>(this.threadExecutor);
     this.cachedSasToken = new CachedSASToken(
         abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+
+    init(abfsOutputStreamContext);
+    buffer = new byte[bufferSize];
+  }
+
+  private void init(final AbfsOutputStreamContext abfsOutputStreamContext) {
+    if (isCommonPoolsInitialised()) {
+      return;
+    }
+
+    initWriteBufferPool(abfsOutputStreamContext);
+
+    ThreadFactory daemonThreadFactory = new ThreadFactory() {

Review comment:
       there's some hadoop thread factory you should be able to lift. Also: 
name the threads

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java
##########
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+import static java.lang.Math.ceil;
+import static java.lang.Math.min;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+
+/**
+ * Pool for byte[]
+ */
+public class AbfsByteBufferPool {
+
+  private static final int HUNDRED = 100;
+
+  /**
+   * Queue holding the free buffers.
+   */
+  private ArrayBlockingQueue<byte[]> freeBuffers;
+  /**
+   * Count to track the buffers issued from AbfsByteBufferPool and yet to be
+   * returned.
+   */
+  private int numBuffersInUse;
+
+  private int bufferSize;
+
+  private int maxBuffersToPool;
+  private int maxMemUsagePercentage;
+
+  /**
+   * @param bufferSize            Size of the byte[] to be returned.
+   * @param queueCapacity         Maximum number of buffers that the pool can
+   *                              keep within the pool.
+   * @param maxMemUsagePercentage Maximum percentage of memory that can
+   *                              be used by the pool from the max
+   *                              available memory.
+   */
+  public AbfsByteBufferPool(final int bufferSize, final int queueCapacity,
+      final int maxMemUsagePercentage) {
+    validate(queueCapacity, maxMemUsagePercentage);
+    this.maxMemUsagePercentage = maxMemUsagePercentage;
+    this.bufferSize = bufferSize;
+    this.numBuffersInUse = 0;
+    this.maxBuffersToPool = queueCapacity;
+    freeBuffers = new ArrayBlockingQueue<>(queueCapacity);
+  }
+
+  private void validate(final int queueCapacity,
+      final int maxWriteMemUsagePercentage) {
+    Preconditions.checkArgument(maxWriteMemUsagePercentage
+            >= MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE
+            && maxWriteMemUsagePercentage
+            <= MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+        "maxWriteMemUsagePercentage should be in range (%s - %s)",
+        MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+        MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE);
+    Preconditions
+        .checkArgument(queueCapacity > 0, "queueCapacity cannot be < 1");
+  }
+
+  private synchronized boolean isPossibleToIssueNewBuffer() {
+    Runtime rt = Runtime.getRuntime();
+    int bufferCountByMaxFreeBuffers =
+        maxBuffersToPool + rt.availableProcessors();

Review comment:
       store availableProcessors in constructor.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsByteBufferPool.java
##########
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+import static java.lang.Math.ceil;
+import static java.lang.Math.min;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE;
+
+/**
+ * Pool for byte[]
+ */
+public class AbfsByteBufferPool {
+
+  private static final int HUNDRED = 100;
+
+  /**
+   * Queue holding the free buffers.
+   */
+  private ArrayBlockingQueue<byte[]> freeBuffers;
+  /**
+   * Count to track the buffers issued from AbfsByteBufferPool and yet to be
+   * returned.
+   */
+  private int numBuffersInUse;
+
+  private int bufferSize;
+
+  private int maxBuffersToPool;
+  private int maxMemUsagePercentage;
+
+  /**
+   * @param bufferSize            Size of the byte[] to be returned.
+   * @param queueCapacity         Maximum number of buffers that the pool can
+   *                              keep within the pool.
+   * @param maxMemUsagePercentage Maximum percentage of memory that can
+   *                              be used by the pool from the max
+   *                              available memory.
+   */
+  public AbfsByteBufferPool(final int bufferSize, final int queueCapacity,
+      final int maxMemUsagePercentage) {
+    validate(queueCapacity, maxMemUsagePercentage);
+    this.maxMemUsagePercentage = maxMemUsagePercentage;
+    this.bufferSize = bufferSize;
+    this.numBuffersInUse = 0;
+    this.maxBuffersToPool = queueCapacity;
+    freeBuffers = new ArrayBlockingQueue<>(queueCapacity);
+  }
+
+  private void validate(final int queueCapacity,
+      final int maxWriteMemUsagePercentage) {
+    Preconditions.checkArgument(maxWriteMemUsagePercentage
+            >= MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE
+            && maxWriteMemUsagePercentage
+            <= MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+        "maxWriteMemUsagePercentage should be in range (%s - %s)",
+        MIN_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE,
+        MAX_VALUE_MAX_AZURE_WRITE_MEM_USAGE_PERCENTAGE);
+    Preconditions
+        .checkArgument(queueCapacity > 0, "queueCapacity cannot be < 1");
+  }
+
+  private synchronized boolean isPossibleToIssueNewBuffer() {
+    Runtime rt = Runtime.getRuntime();

Review comment:
       could this be done outside a sync block? it probably does a JNI call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to