keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167342746
 
 

 ##########
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##########
 @@ -321,60 +321,67 @@ public String execute(ReplicationCoordinator.Client 
client) throws Exception {
   protected Status replicateRFiles(ClientContext peerContext, final 
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final 
Status status,
       final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, long timeout) throws 
TTransportException,
       AccumuloException, AccumuloSecurityException {
-    DataInputStream input;
-    try {
-      input = getRFileInputStream(p);
-    } catch (IOException e) {
-      log.error("Could not create input stream from RFile, will retry", e);
-      return status;
-    }
-
-    Status lastStatus = status, currentStatus = status;
-    while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+    try (final DataInputStream input = getRFileInputStream(p)) {
+      Status lastStatus = status, currentStatus = status;
+      while (true) {
+        // Read and send a batch of mutations
+        ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
+            currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
       }
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
     }
   }
 
   protected Status replicateLogs(ClientContext peerContext, final HostAndPort 
peerTserver, final ReplicationTarget target, final Path p, final Status status,
       final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, final UserGroupInformation 
accumuloUgi,
       long timeout) throws TTransportException, AccumuloException, 
AccumuloSecurityException {
-
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
-    final DataInputStream input;
-    Span span = Trace.start("Read WAL header");
-    span.data("file", p.toString());
-    try {
-      input = getWalStream(p);
+    try (final DataInputStream input = getWalStream(p)) {
 
 Review comment:
   The call to `getWalStream()` calls `DfsLogger.readHeaderAndReturnStream()` 
which calls `fs.open()`.  If there is IOException in 
`readHeaderAndReturnStream()` or in `getWalStream()` then is seems like the 
FSDataInputStream will not be closed.
   
   Not sure what the best way to handle this is, but one possibility I thought 
of is to refactor the code to call `fs.open()` in this method and then pass 
that down.  Maybe something like
   
   ```java
   try(FSDataInputStream fsdi = fs.open(path)){
      DataInputStream input = getWalStream(fsdi);
   }
   ```
   
   or could do the following, I think java will close things in the reverse 
order from opened.
   
   ```java
   try(FSDataInputStream fsdi = fs.open(path); DataInputStream input = 
getWalStream(fsdi)){
    
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to