This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.11.x by this
push:
new d2afefe NIFI-7114: Fix file leaks in StandardCommsSession and S2S
Reporting components
d2afefe is described below
commit d2afefe009074ff3cd3c3655e516b0137207ba25
Author: Matthew Burgess <[email protected]>
AuthorDate: Fri Feb 21 16:20:38 2020 -0500
NIFI-7114: Fix file leaks in StandardCommsSession and S2S Reporting
components
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi/reporting/SiteToSiteBulletinReportingTask.java | 14 +++++++++++---
.../nifi/reporting/SiteToSiteMetricsReportingTask.java | 12 ++++++++++--
.../reporting/SiteToSiteProvenanceReportingTask.java | 14 +++++++++++---
.../nifi/reporting/SiteToSiteStatusReportingTask.java | 14 +++++++++++---
.../reporting/sink/SiteToSiteReportingRecordSink.java | 16 ++++++++++------
.../distributed/cache/client/StandardCommsSession.java | 2 ++
6 files changed, 55 insertions(+), 17 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 3e07759..1e68687 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -137,11 +137,12 @@ public class SiteToSiteBulletinReportingTask extends
AbstractSiteToSiteReporting
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
+ Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
- final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ transaction =
getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will
attempt to send data later");
return;
@@ -162,8 +163,15 @@ public class SiteToSiteBulletinReportingTask extends
AbstractSiteToSiteReporting
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Bulletins to destination in
{} ms; Transaction ID = {}; First Event ID = {}",
new Object[]{bulletins.size(), transferMillis,
transactionId, bulletins.get(0).getId()});
- } catch (final IOException e) {
- throw new ProcessException("Failed to send Bulletins to
destination due to IOException:" + e.getMessage(), e);
+ } catch (final Exception e) {
+ if (transaction != null) {
+ transaction.error();
+ }
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException("Failed to send Bulletins to
destination due to IOException:" + e.getMessage(), e);
+ }
}
lastSentBulletinId = currMaxId;
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
index e781dfa..898bbbd 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -191,12 +191,13 @@ public class SiteToSiteMetricsReportingTask extends
AbstractSiteToSiteReportingT
data = getData(context, new
ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)),
attributes);
}
+ Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
- final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ transaction =
getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized;
will attempt to send data later");
return;
@@ -215,7 +216,14 @@ public class SiteToSiteMetricsReportingTask extends
AbstractSiteToSiteReportingT
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent metrics to destination in
{}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
} catch (final Exception e) {
- throw new ProcessException("Failed to send metrics to
destination due to:" + e.getMessage(), e);
+ if (transaction != null) {
+ transaction.error();
+ }
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException("Failed to send metrics to
destination due to:" + e.getMessage(), e);
+ }
}
} else {
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 0ff507c..d382ca3 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -304,11 +304,12 @@ public class SiteToSiteProvenanceReportingTask extends
AbstractSiteToSiteReporti
final JsonArray jsonArray = arrayBuilder.build();
// Send the JSON document for the current batch
+ Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
- final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ transaction =
getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
// Throw an exception to avoid provenance event id will
not proceed so that those can be consumed again.
throw new ProcessException("All destination nodes are
penalized; will attempt to send data later");
@@ -329,8 +330,15 @@ public class SiteToSiteProvenanceReportingTask extends
AbstractSiteToSiteReporti
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().info("Successfully sent {} Provenance Events to
destination in {} ms; Transaction ID = {}; First Event ID = {}",
new Object[] {events.size(), transferMillis,
transactionId, events.get(0).getEventId()});
- } catch (final IOException e) {
- throw new ProcessException("Failed to send Provenance Events
to destination due to IOException:" + e.getMessage(), e);
+ } catch (final Exception e) {
+ if (transaction != null) {
+ transaction.error();
+ }
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException("Failed to send Provenance
Events to destination due to IOException:" + e.getMessage(), e);
+ }
}
});
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 31009f8..7464b43 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -161,12 +161,13 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
while(!jsonBatch.isEmpty()) {
// Send the JSON document for the current batch
+ Transaction transaction = null;
try {
// Lazily create SiteToSiteClient to provide a StateManager
setup(context);
long start = System.nanoTime();
- final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ transaction =
getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().debug("All destination nodes are penalized;
will attempt to send data later");
return;
@@ -197,8 +198,15 @@ public class SiteToSiteStatusReportingTask extends
AbstractSiteToSiteReportingTa
fromIndex = toIndex;
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
- } catch (final IOException e) {
- throw new ProcessException("Failed to send Status Records to
destination due to IOException:" + e.getMessage(), e);
+ } catch (final Exception e) {
+ if (transaction != null) {
+ transaction.error();
+ }
+ if (e instanceof ProcessException) {
+ throw (ProcessException) e;
+ } else {
+ throw new ProcessException("Failed to send Status Records
to destination due to IOException:" + e.getMessage(), e);
+ }
}
}
}
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index 20c501e..b2ed107 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -135,10 +135,10 @@ public class SiteToSiteReportingRecordSink extends
AbstractControllerService imp
@Override
public WriteResult sendData(final RecordSet recordSet, final
Map<String,String> attributes, final boolean sendZeroResults) throws
IOException {
-
+ Transaction transaction = null;
try {
WriteResult writeResult = null;
- final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ transaction =
getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
getLogger().info("All destination nodes are penalized; will
attempt to send data later");
} else {
@@ -166,12 +166,16 @@ public class SiteToSiteReportingRecordSink extends
AbstractControllerService imp
}
}
return writeResult;
- } catch(IOException ioe) {
- throw ioe;
} catch (Exception e) {
- throw new IOException("Failed to write metrics using record
writer: " + e.getMessage(), e);
+ if (transaction != null) {
+ transaction.error();
+ }
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException("Failed to write metrics using record
writer: " + e.getMessage(), e);
+ }
}
-
}
@OnDisabled
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index d157161..56e3389 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -71,6 +71,8 @@ public class StandardCommsSession implements CommsSession {
@Override
public void close() throws IOException {
socketChannel.close();
+ bufferedIn.close();
+ bufferedOut.close();
}
@Override