ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r164253587
 
 

 ##########
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##########
 @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
     Status lastStatus = status, currentStatus = status;
     final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-    while (true) {
-      // Set some trace info
-      span = Trace.start("Replicate WAL batch");
-      span.data("Batch size (bytes)", Long.toString(sizeLimit));
-      span.data("File", p.toString());
-      span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
-      span.data("Peer tserver", peerTserver.toString());
-      span.data("Remote table ID", remoteTableId);
-
-      ReplicationStats replResult;
-      try {
-        // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids), timeout);
-      } catch (Exception e) {
-        log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
-        throw e;
-      } finally {
-        span.stop();
-      }
-
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
-
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+    try {
+      while (true) {
+        // Set some trace info
+        span = Trace.start("Replicate WAL batch");
+        span.data("Batch size (bytes)", Long.toString(sizeLimit));
+        span.data("File", p.toString());
+        span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
+        span.data("Peer tserver", peerTserver.toString());
+        span.data("Remote table ID", remoteTableId);
+
+        ReplicationStats replResult;
+        try {
+          // Read and send a batch of mutations
+          replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+                  remoteTableId, tcreds, tids), timeout);
+        } catch (Exception e) {
+          log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
+          throw e;
+        } finally {
+          span.stop();
+        }
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        span = Trace.start("Update replication table");
-        try {
-          if (null != accumuloUgi) {
-            final Status copy = currentStatus;
-            accumuloUgi.doAs(new PrivilegedAction<Void>() {
-              @Override
-              public Void run() {
-                try {
-                  helper.recordNewStatus(p, copy, target);
-                } catch (Exception e) {
-                  exceptionRef.set(e);
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
+
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          span = Trace.start("Update replication table");
+          try {
+            if (null != accumuloUgi) {
+              final Status copy = currentStatus;
+              accumuloUgi.doAs(new PrivilegedAction<Void>() {
+                @Override
+                public Void run() {
+                  try {
+                    helper.recordNewStatus(p, copy, target);
+                  } catch (Exception e) {
+                    exceptionRef.set(e);
+                  }
+                  return null;
+                }
+              });
+              Exception e = exceptionRef.get();
+              if (null != e) {
+                if (e instanceof TableNotFoundException) {
+                  throw (TableNotFoundException) e;
+                } else if (e instanceof AccumuloSecurityException) {
+                  throw (AccumuloSecurityException) e;
+                } else if (e instanceof AccumuloException) {
+                  throw (AccumuloException) e;
+                } else {
+                  throw new RuntimeException("Received unexpected exception", 
e);
                 }
-                return null;
-              }
-            });
-            Exception e = exceptionRef.get();
-            if (null != e) {
-              if (e instanceof TableNotFoundException) {
-                throw (TableNotFoundException) e;
-              } else if (e instanceof AccumuloSecurityException) {
-                throw (AccumuloSecurityException) e;
-              } else if (e instanceof AccumuloException) {
-                throw (AccumuloException) e;
-              } else {
-                throw new RuntimeException("Received unexpected exception", e);
               }
+            } else {
+              helper.recordNewStatus(p, currentStatus, target);
             }
-          } else {
-            helper.recordNewStatus(p, currentStatus, target);
+          } catch (TableNotFoundException e) {
+            log.error("Tried to update status in replication table for {} as 
{}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+            throw new RuntimeException("Replication table did not exist, will 
retry", e);
+          } finally {
+            span.stop();
           }
-        } catch (TableNotFoundException e) {
-          log.error("Tried to update status in replication table for {} as {}, 
but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
-          throw new RuntimeException("Replication table did not exist, will 
retry", e);
-        } finally {
-          span.stop();
-        }
 
-        log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
+          log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
 
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
+      }
+    } finally {
+      try {
+        input.close();
 
 Review comment:
   It might be better to allocate input in a try-with-resources block instead. 
That way, it is always closed, even if the method returns normally before it 
gets to this while(true) loop (I noticed another try-catch above which returns, 
and leaves the input unclosed).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to