This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new f641a15 NIFI-6989: Fixed SiteToSiteReportingRecordSink when
transaction sent no data
f641a15 is described below
commit f641a15c5fbc3b99931989ecf05cf9df12a5a9c6
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Jan 7 21:19:48 2020 -0500
NIFI-6989: Fixed SiteToSiteReportingRecordSink when transaction sent no data
Signed-off-by: Pierre Villard <[email protected]>
This closes #3965.
---
.../sink/SiteToSiteReportingRecordSink.java | 7 +++--
.../sink/TestSiteToSiteReportingRecordSink.java | 35 +++++++++++++++++-----
2 files changed, 32 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index 5c372da..574ebd4 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -161,9 +161,11 @@ public class SiteToSiteReportingRecordSink extends
AbstractControllerService imp
if (recordCount > 0 || sendZeroResults) {
transaction.send(out.toByteArray(), attributes);
+ transaction.confirm();
+ transaction.complete();
+ } else {
+ transaction.cancel("No data to send");
}
- transaction.confirm();
- transaction.complete();
}
return writeResult;
} catch(IOException ioe) {
@@ -179,6 +181,7 @@ public class SiteToSiteReportingRecordSink extends
AbstractControllerService imp
final SiteToSiteClient client = getClient();
if (client != null) {
client.close();
+ this.siteToSiteClient = null;
}
}
diff --git
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
index 3b5535c..0c426d7 100644
---
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
+++
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
@@ -43,7 +43,6 @@ import
org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
@@ -100,6 +99,29 @@ public class TestSiteToSiteReportingRecordSink {
assertEquals("World!", data[1]);
}
+ @Test
+ public void testNoRows() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(SiteToSiteReportingRecordSink.RECORD_WRITER_FACTORY,
"record-writer");
+ MockSiteToSiteReportingRecordSink task = initTask(properties);
+
+ List<RecordField> recordFields = Arrays.asList(
+ new RecordField("field1", RecordFieldType.INT.getDataType()),
+ new RecordField("field2", RecordFieldType.STRING.getDataType())
+ );
+ RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+ task.sendData(RecordSet.of(recordSchema), new HashMap<>(), true);
+
+ // One entry of an empty byte array
+ assertEquals(1, task.dataSent.size());
+ assertEquals(0, task.dataSent.get(0).length);
+
+ task.sendData(RecordSet.of(recordSchema), new HashMap<>(), false);
+ // Still only one entry even after two sends (toggled sendZeroResults)
+ assertEquals(1, task.dataSent.size());
+ }
+
public MockSiteToSiteReportingRecordSink initTask(Map<PropertyDescriptor,
String> customProperties) throws InitializationException, IOException {
final MockSiteToSiteReportingRecordSink task = new
MockSiteToSiteReportingRecordSink();
@@ -133,13 +155,10 @@ public class TestSiteToSiteReportingRecordSink {
final Transaction transaction = Mockito.mock(Transaction.class);
try {
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation)
throws Throwable {
- final byte[] data = invocation.getArgument(0,
byte[].class);
- dataSent.add(data);
- return null;
- }
+ Mockito.doAnswer((Answer<Object>) invocation -> {
+ final byte[] data = invocation.getArgument(0,
byte[].class);
+ dataSent.add(data);
+ return null;
}).when(transaction).send(Mockito.any(byte[].class),
Mockito.any(Map.class));
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);