[GitHub] [hadoop] steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common across all AbfsOutputSt

2020-03-18 Thread GitBox
steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to 
prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common 
across all AbfsOutputStream instances
URL: https://github.com/apache/hadoop/pull/1890#discussion_r394372894
 
 

 ##
 File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 ##
 @@ -63,20 +70,53 @@
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
-  private final int maxConcurrentRequestCount;
+
+  private static int maxConcurrentRequestcount;
+  private static int maxBufferCount;
 
   private ConcurrentLinkedDeque writeOperations;
-  private final ThreadPoolExecutor threadExecutor;
-  private final ExecutorCompletionService completionService;
+  private static final Object INIT_LOCK = new Object();
+  private static ThreadPoolExecutor threadExecutor;
+  private static ExecutorCompletionService completionService;
+
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int HUNDRED_MB = 100 * ONE_MB;
+  private static final int MIN_MEMORY_THRESHOLD = HUNDRED_MB;
 
   /**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
-  private final ElasticByteBufferPool byteBufferPool
-  = new ElasticByteBufferPool();
+  private static final ElasticByteBufferPool BYTE_BUFFER_POOL
+  = new ElasticByteBufferPool();
+  private static AtomicInteger buffersToBeReturned = new AtomicInteger(0);
+
+  static {
+if (threadExecutor == null) {
+  synchronized (INIT_LOCK) {
+if (threadExecutor == null) {
+  int availableProcessors = Runtime.getRuntime().availableProcessors();
+  maxConcurrentRequestcount = 4 * availableProcessors;
+  maxBufferCount = maxConcurrentRequestcount + availableProcessors + 1;
 
 Review comment:
   I don't like the hard coded assumptions about #of CPUs and amount of space 
which can be used for buffering.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common across all AbfsOutputSt

2020-03-18 Thread GitBox
steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to 
prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common 
across all AbfsOutputStream instances
URL: https://github.com/apache/hadoop/pull/1890#discussion_r394372525
 
 

 ##
 File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 ##
 @@ -63,20 +70,53 @@
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
-  private final int maxConcurrentRequestCount;
+
+  private static int maxConcurrentRequestcount;
+  private static int maxBufferCount;
 
   private ConcurrentLinkedDeque writeOperations;
-  private final ThreadPoolExecutor threadExecutor;
-  private final ExecutorCompletionService completionService;
+  private static final Object INIT_LOCK = new Object();
+  private static ThreadPoolExecutor threadExecutor;
+  private static ExecutorCompletionService completionService;
+
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int HUNDRED_MB = 100 * ONE_MB;
+  private static final int MIN_MEMORY_THRESHOLD = HUNDRED_MB;
 
   /**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
-  private final ElasticByteBufferPool byteBufferPool
-  = new ElasticByteBufferPool();
+  private static final ElasticByteBufferPool BYTE_BUFFER_POOL
+  = new ElasticByteBufferPool();
+  private static AtomicInteger buffersToBeReturned = new AtomicInteger(0);
+
+  static {
+if (threadExecutor == null) {
+  synchronized (INIT_LOCK) {
+if (threadExecutor == null) {
+  int availableProcessors = Runtime.getRuntime().availableProcessors();
+  maxConcurrentRequestcount = 4 * availableProcessors;
+  maxBufferCount = maxConcurrentRequestcount + availableProcessors + 1;
+  ThreadFactory deamonThreadFactory = new ThreadFactory() {
+@Override
+public Thread newThread(Runnable runnable) {
+  Thread deamonThread = Executors.defaultThreadFactory()
+  .newThread(runnable);
+  deamonThread.setDaemon(true);
+  return deamonThread;
+}
+  };
+  threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestcount,
 
 Review comment:
   use HadoopExecutors for executors if possible


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common across all AbfsOutputSt

2020-03-18 Thread GitBox
steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to 
prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common 
across all AbfsOutputStream instances
URL: https://github.com/apache/hadoop/pull/1890#discussion_r394373334
 
 

 ##
 File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 ##
 @@ -63,20 +70,53 @@
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
-  private final int maxConcurrentRequestCount;
+
+  private static int maxConcurrentRequestcount;
+  private static int maxBufferCount;
 
   private ConcurrentLinkedDeque writeOperations;
-  private final ThreadPoolExecutor threadExecutor;
-  private final ExecutorCompletionService completionService;
+  private static final Object INIT_LOCK = new Object();
+  private static ThreadPoolExecutor threadExecutor;
+  private static ExecutorCompletionService completionService;
+
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int HUNDRED_MB = 100 * ONE_MB;
+  private static final int MIN_MEMORY_THRESHOLD = HUNDRED_MB;
 
   /**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
-  private final ElasticByteBufferPool byteBufferPool
-  = new ElasticByteBufferPool();
+  private static final ElasticByteBufferPool BYTE_BUFFER_POOL
 
 Review comment:
   ElasticByteBufferPool is is trouble because it never frees cached buffers


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common across all AbfsOutputSt

2020-03-18 Thread GitBox
steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to 
prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common 
across all AbfsOutputStream instances
URL: https://github.com/apache/hadoop/pull/1890#discussion_r394371435
 
 

 ##
 File path: 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
 ##
 @@ -207,6 +210,70 @@ public Void call() throws Exception {
 assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
   }
 
+  @Test
+  public void testWriteWithMultipleOutputStreamAtTheSameTime()
+  throws IOException, InterruptedException, ExecutionException {
+AzureBlobFileSystem fs = getFileSystem();
+String testFilePath = methodName.getMethodName();
+Path[] testPaths = new Path[CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT];
+createNStreamsAndWriteDifferentSizesConcurrently(fs, testFilePath,
+CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+assertSuccessfulWritesOnAllStreams(fs,
+CONCURRENT_STREAM_OBJS_TEST_OBJ_COUNT, testPaths);
+  }
+
+  private void assertSuccessfulWritesOnAllStreams(final FileSystem fs,
+  final int numConcurrentObjects, final Path[] testPaths)
+  throws IOException {
+for (int i = 0; i < numConcurrentObjects; i++) {
+  FileStatus fileStatus = fs.getFileStatus(testPaths[i]);
+  int numWritesMadeOnStream = i + 1;
+  long expectedLength = TEST_BUFFER_SIZE * numWritesMadeOnStream;
+  assertThat(fileStatus.getLen(), is(equalTo(expectedLength)));
+}
+  }
+
+  private void createNStreamsAndWriteDifferentSizesConcurrently(
+  final FileSystem fs, final String testFilePath,
+  final int numConcurrentObjects, final Path[] testPaths)
+  throws ExecutionException, InterruptedException {
+final byte[] b = new byte[TEST_BUFFER_SIZE];
+new Random().nextBytes(b);
+final ExecutorService es = Executors.newFixedThreadPool(40);
+final List> futureTasks = new ArrayList<>();
+for (int i = 0; i < numConcurrentObjects; i++) {
+  Path testPath = new Path(testFilePath + i);
+  testPaths[i] = testPath;
+  int numWritesToBeDone = i + 1;
+  futureTasks.add(es.submit(() -> {
+try (FSDataOutputStream stream = fs.create(testPath)) {
+  makeNWritesToStream(stream, numWritesToBeDone, b, es);
+}
+return null;
+  }));
+}
+for (Future futureTask : futureTasks) {
+  futureTask.get();
+}
+es.shutdownNow();
+  }
+
+  private void makeNWritesToStream(final FSDataOutputStream stream,
+  final int numWrites, final byte[] b, final ExecutorService es)
+  throws IOException, ExecutionException, InterruptedException {
+final List> futureTasks = new ArrayList<>();
+for (int i = 0; i < numWrites; i++) {
+  futureTasks.add(es.submit(() -> {
+stream.write(b);
+return null;
+  }));
+}
+for (Future futureTask : futureTasks) {
 
 Review comment:
   I'm sure there is a way to block for multiple futures. See also  
FutureIOSupport.awaitFuture


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common across all AbfsOutputSt

2020-03-18 Thread GitBox
steveloughran commented on a change in pull request #1890: HADOOP-16854 Fix to 
prevent OutOfMemoryException and Make the threadpool and bytebuffer pool common 
across all AbfsOutputStream instances
URL: https://github.com/apache/hadoop/pull/1890#discussion_r394358730
 
 

 ##
 File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 ##
 @@ -63,20 +70,53 @@
   private final int bufferSize;
   private byte[] buffer;
   private int bufferIndex;
-  private final int maxConcurrentRequestCount;
+
+  private static int maxConcurrentRequestcount;
+  private static int maxBufferCount;
 
   private ConcurrentLinkedDeque writeOperations;
-  private final ThreadPoolExecutor threadExecutor;
-  private final ExecutorCompletionService completionService;
+  private static final Object INIT_LOCK = new Object();
+  private static ThreadPoolExecutor threadExecutor;
+  private static ExecutorCompletionService completionService;
+
+  private static final int ONE_MB = 1024 * 1024;
+  private static final int HUNDRED_MB = 100 * ONE_MB;
+  private static final int MIN_MEMORY_THRESHOLD = HUNDRED_MB;
 
   /**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
-  private final ElasticByteBufferPool byteBufferPool
-  = new ElasticByteBufferPool();
+  private static final ElasticByteBufferPool BYTE_BUFFER_POOL
+  = new ElasticByteBufferPool();
+  private static AtomicInteger buffersToBeReturned = new AtomicInteger(0);
+
+  static {
+if (threadExecutor == null) {
+  synchronized (INIT_LOCK) {
+if (threadExecutor == null) {
+  int availableProcessors = Runtime.getRuntime().availableProcessors();
 
 Review comment:
   this is making some big assumptions about exclusive access to CPUs. Why the 
specific choice of 4*core?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org