lakshmi-manasa-g commented on a change in pull request #1542: URL: https://github.com/apache/samza/pull/1542#discussion_r728555630
########## File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java ########## @@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() { // call async stageblock and add to future @Override public void run() { - int attemptCount = 0; byte[] compressedLocalByte = compression.compress(localByte); int blockSize = compressedLocalByte.length; - while (attemptCount < MAX_ATTEMPT) { - try { - ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize); - metrics.updateCompressByteMetrics(blockSize); - LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize); - metrics.updateAzureUploadMetrics(); - // StageBlock generates exception on Failure. - stageBlock(blockIdEncoded, outputStream, blockSize); - break; - } catch (InterruptedException e) { Review comment: yes interrupted exception can be thrown while sending request. in which case, it is simply logged and bubbled up. that part of interrupted exception handling has not changed in this pr. ########## File path: samza-azure/src/main/java/org/apache/samza/system/azureblob/avro/AzureBlobOutputStream.java ########## @@ -322,33 +321,20 @@ private synchronized void uploadBlockAsync() { // call async stageblock and add to future @Override public void run() { - int attemptCount = 0; byte[] compressedLocalByte = compression.compress(localByte); int blockSize = compressedLocalByte.length; - while (attemptCount < MAX_ATTEMPT) { - try { - ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize); - metrics.updateCompressByteMetrics(blockSize); - LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize); - metrics.updateAzureUploadMetrics(); - // StageBlock generates exception on Failure. - stageBlock(blockIdEncoded, outputStream, blockSize); - break; - } catch (InterruptedException e) { - String msg = String.format("Upload block for blob: %s failed for blockid: %s due to InterruptedException.", - blobAsyncClient.getBlobUrl().toString(), blockId); - LOG.error(msg, e); - throw new AzureException("InterruptedException encountered during block upload. Will not retry.", e); - } catch (Exception e) { - attemptCount += 1; - String msg = "Upload block for blob: " + blobAsyncClient.getBlobUrl().toString() - + " failed for blockid: " + blockId + " due to exception. AttemptCount: " + attemptCount; - LOG.error(msg, e); - if (attemptCount == MAX_ATTEMPT) { - throw new AzureException("Exceeded number of attempts. Max attempts is: " + MAX_ATTEMPT, e); - } - } + try { + ByteBuffer outputStream = ByteBuffer.wrap(compressedLocalByte, 0, blockSize); + metrics.updateCompressByteMetrics(blockSize); + LOG.info("{} Upload block start for blob: {} for block size:{}.", blobAsyncClient.getBlobUrl().toString(), blockId, blockSize); + metrics.updateAzureUploadMetrics(); Review comment: doing so now will actually make this change backwards incompatible. the errors for upload are tracked separately [here](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/azureblob/AzureBlobBasicMetrics.java#L46) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org