This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new f641a15  NIFI-6989: Fixed SiteToSiteReportingRecordSink when 
transaction sent no data
f641a15 is described below

commit f641a15c5fbc3b99931989ecf05cf9df12a5a9c6
Author: Matthew Burgess <[email protected]>
AuthorDate: Tue Jan 7 21:19:48 2020 -0500

    NIFI-6989: Fixed SiteToSiteReportingRecordSink when transaction sent no data
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3965.
---
 .../sink/SiteToSiteReportingRecordSink.java        |  7 +++--
 .../sink/TestSiteToSiteReportingRecordSink.java    | 35 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
index 5c372da..574ebd4 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/sink/SiteToSiteReportingRecordSink.java
@@ -161,9 +161,11 @@ public class SiteToSiteReportingRecordSink extends 
AbstractControllerService imp
 
                 if (recordCount > 0 || sendZeroResults) {
                     transaction.send(out.toByteArray(), attributes);
+                    transaction.confirm();
+                    transaction.complete();
+                } else {
+                    transaction.cancel("No data to send");
                 }
-                transaction.confirm();
-                transaction.complete();
             }
             return writeResult;
         } catch(IOException ioe) {
@@ -179,6 +181,7 @@ public class SiteToSiteReportingRecordSink extends 
AbstractControllerService imp
         final SiteToSiteClient client = getClient();
         if (client != null) {
             client.close();
+            this.siteToSiteClient = null;
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
index 3b5535c..0c426d7 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/sink/TestSiteToSiteReportingRecordSink.java
@@ -43,7 +43,6 @@ import 
org.apache.nifi.util.MockControllerServiceInitializationContext;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
@@ -100,6 +99,29 @@ public class TestSiteToSiteReportingRecordSink {
         assertEquals("World!", data[1]);
     }
 
+    @Test
+    public void testNoRows() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteReportingRecordSink.RECORD_WRITER_FACTORY, 
"record-writer");
+        MockSiteToSiteReportingRecordSink task = initTask(properties);
+
+        List<RecordField> recordFields = Arrays.asList(
+                new RecordField("field1", RecordFieldType.INT.getDataType()),
+                new RecordField("field2", RecordFieldType.STRING.getDataType())
+        );
+        RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+        task.sendData(RecordSet.of(recordSchema), new HashMap<>(), true);
+
+        // One entry of an empty byte array
+        assertEquals(1, task.dataSent.size());
+        assertEquals(0, task.dataSent.get(0).length);
+
+        task.sendData(RecordSet.of(recordSchema), new HashMap<>(), false);
+        // Still only one entry even after two sends (toggled sendZeroResults)
+        assertEquals(1, task.dataSent.size());
+    }
+
     public MockSiteToSiteReportingRecordSink initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException, IOException {
 
         final MockSiteToSiteReportingRecordSink task = new 
MockSiteToSiteReportingRecordSink();
@@ -133,13 +155,10 @@ public class TestSiteToSiteReportingRecordSink {
             final Transaction transaction = Mockito.mock(Transaction.class);
 
             try {
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) 
throws Throwable {
-                        final byte[] data = invocation.getArgument(0, 
byte[].class);
-                        dataSent.add(data);
-                        return null;
-                    }
+                Mockito.doAnswer((Answer<Object>) invocation -> {
+                    final byte[] data = invocation.getArgument(0, 
byte[].class);
+                    dataSent.add(data);
+                    return null;
                 }).when(transaction).send(Mockito.any(byte[].class), 
Mockito.any(Map.class));
 
                 
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);

Reply via email to