This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.11.x by this 
push:
     new d2afefe  NIFI-7114: Fix file leaks in StandardCommsSession and S2S 
Reporting components
d2afefe is described below

commit d2afefe009074ff3cd3c3655e516b0137207ba25
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Feb 21 16:20:38 2020 -0500

    NIFI-7114: Fix file leaks in StandardCommsSession and S2S Reporting 
components
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../nifi/reporting/SiteToSiteBulletinReportingTask.java  | 14 +++++++++++---
 .../nifi/reporting/SiteToSiteMetricsReportingTask.java   | 12 ++++++++++--
 .../reporting/SiteToSiteProvenanceReportingTask.java     | 14 +++++++++++---
 .../nifi/reporting/SiteToSiteStatusReportingTask.java    | 14 +++++++++++---
 .../reporting/sink/SiteToSiteReportingRecordSink.java    | 16 ++++++++++------
 .../distributed/cache/client/StandardCommsSession.java   |  2 ++
 6 files changed, 55 insertions(+), 17 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 3e07759..1e68687 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -137,11 +137,12 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
         final JsonArray jsonArray = arrayBuilder.build();
 
         // Send the JSON document for the current batch
+        Transaction transaction = null;
         try {
             // Lazily create SiteToSiteClient to provide a StateManager
             setup(context);
 
-            final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+            transaction = 
getClient().createTransaction(TransferDirection.SEND);
             if (transaction == null) {
                 getLogger().info("All destination nodes are penalized; will 
attempt to send data later");
                 return;
@@ -162,8 +163,15 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
             final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             getLogger().info("Successfully sent {} Bulletins to destination in 
{} ms; Transaction ID = {}; First Event ID = {}",
                     new Object[]{bulletins.size(), transferMillis, 
transactionId, bulletins.get(0).getId()});
-        } catch (final IOException e) {
-            throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);
+        } catch (final Exception e) {
+            if (transaction != null) {
+                transaction.error();
+            }
+            if (e instanceof ProcessException) {
+                throw (ProcessException) e;
+            } else {
+                throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);
+            }
         }
 
         lastSentBulletinId = currMaxId;
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
index e781dfa..898bbbd 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -191,12 +191,13 @@ public class SiteToSiteMetricsReportingTask extends 
AbstractSiteToSiteReportingT
                 data = getData(context, new 
ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)),
 attributes);
             }
 
+            Transaction transaction = null;
             try {
                 // Lazily create SiteToSiteClient to provide a StateManager
                 setup(context);
 
                 long start = System.nanoTime();
-                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
                     getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
                     return;
@@ -215,7 +216,14 @@ public class SiteToSiteMetricsReportingTask extends 
AbstractSiteToSiteReportingT
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                 getLogger().info("Successfully sent metrics to destination in 
{}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
             } catch (final Exception e) {
-                throw new ProcessException("Failed to send metrics to 
destination due to:" + e.getMessage(), e);
+                if (transaction != null) {
+                    transaction.error();
+                }
+                if (e instanceof ProcessException) {
+                    throw (ProcessException) e;
+                } else {
+                    throw new ProcessException("Failed to send metrics to 
destination due to:" + e.getMessage(), e);
+                }
             }
 
         } else {
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 0ff507c..d382ca3 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -304,11 +304,12 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
             final JsonArray jsonArray = arrayBuilder.build();
 
             // Send the JSON document for the current batch
+            Transaction transaction = null;
             try {
                 // Lazily create SiteToSiteClient to provide a StateManager
                 setup(context);
 
-                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
                     // Throw an exception to avoid provenance event id will 
not proceed so that those can be consumed again.
                     throw new ProcessException("All destination nodes are 
penalized; will attempt to send data later");
@@ -329,8 +330,15 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                 getLogger().info("Successfully sent {} Provenance Events to 
destination in {} ms; Transaction ID = {}; First Event ID = {}",
                         new Object[] {events.size(), transferMillis, 
transactionId, events.get(0).getEventId()});
-            } catch (final IOException e) {
-                throw new ProcessException("Failed to send Provenance Events 
to destination due to IOException:" + e.getMessage(), e);
+            } catch (final Exception e) {
+                if (transaction != null) {
+                    transaction.error();
+                }
+                if (e instanceof ProcessException) {
+                    throw (ProcessException) e;
+                } else {
+                    throw new ProcessException("Failed to send Provenance 
Events to destination due to IOException:" + e.getMessage(), e);
+                }
             }
         });
 
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 31009f8..7464b43 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -161,12 +161,13 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
 
         while(!jsonBatch.isEmpty()) {
             // Send the JSON document for the current batch
+            Transaction transaction = null;
             try {
                 // Lazily create SiteToSiteClient to provide a StateManager
                 setup(context);
 
                 long start = System.nanoTime();
-                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                transaction = 
getClient().createTransaction(TransferDirection.SEND);
                 if (transaction == null) {
                     getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
                     return;
@@ -197,8 +198,15 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
                 fromIndex = toIndex;
                 toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
                 jsonBatch = jsonArray.subList(fromIndex, toIndex);
-            } catch (final IOException e) {
-                throw new ProcessException("Failed to send Status Records to 
destination due to IOException:" + e.getMessage(), e);
+            } catch (final Exception e) {
+                if (transaction != null) {
+                    transaction.error();
+                }
+                if (e instanceof ProcessException) {
+                    throw (ProcessException) e;
+                } else {
+                    throw new ProcessException("Failed to send Status Records 
to destination due to IOException:" + e.getMessage(), e);
+                }
             }
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index 20c501e..b2ed107 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -135,10 +135,10 @@ public class SiteToSiteReportingRecordSink extends 
AbstractControllerService imp
 
     @Override
     public WriteResult sendData(final RecordSet recordSet, final 
Map<String,String> attributes, final boolean sendZeroResults) throws 
IOException {
-
+        Transaction transaction = null;
         try {
             WriteResult writeResult = null;
-            final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+            transaction = 
getClient().createTransaction(TransferDirection.SEND);
             if (transaction == null) {
                 getLogger().info("All destination nodes are penalized; will 
attempt to send data later");
             } else {
@@ -166,12 +166,16 @@ public class SiteToSiteReportingRecordSink extends 
AbstractControllerService imp
                 }
             }
             return writeResult;
-        } catch(IOException ioe) {
-            throw ioe;
         } catch (Exception e) {
-            throw new IOException("Failed to write metrics using record 
writer: " + e.getMessage(), e);
+            if (transaction != null) {
+                transaction.error();
+            }
+            if (e instanceof IOException) {
+                throw (IOException) e;
+            } else {
+                throw new IOException("Failed to write metrics using record 
writer: " + e.getMessage(), e);
+            }
         }
-
     }
 
     @OnDisabled
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index d157161..56e3389 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -71,6 +71,8 @@ public class StandardCommsSession implements CommsSession {
     @Override
     public void close() throws IOException {
         socketChannel.close();
+        bufferedIn.close();
+        bufferedOut.close();
     }
 
     @Override

Reply via email to