This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new cf4e966  NIFI-8337: This closes #4910. Fixed bug in 
StandardProcessSession where the session didn't account for FlowFile's 
contentClaimOffset when seeking to the appropriate location in the stream.
cf4e966 is described below

commit cf4e966d912975dfeac1cbf80f9e073536405897
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 17 13:30:38 2021 -0400

    NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the 
session didn't account for FlowFile's contentClaimOffset when seeking to the 
appropriate location in the stream.
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../repository/StandardProcessSession.java         |   4 +-
 .../processors/tests/system/ReverseContents.java   |   2 +
 .../nifi/processors/tests/system/SplitByLine.java  | 126 +++++++++++++++++
 .../processors/tests/system/VerifyContents.java    | 102 ++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  23 ++++
 .../nifi/tests/system/processor/RunOnceIT.java     |   3 -
 .../tests/system/repositories/ContentAccessIT.java | 150 +++++++++++++++++++++
 8 files changed, 407 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 1d24f33..b2b1bde 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2300,9 +2300,9 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
 
                 currentReadClaim = claim.getResourceClaim();
                 final InputStream contentRepoStream = 
context.getContentRepository().read(claim.getResourceClaim());
-                StreamUtils.skip(contentRepoStream, claim.getOffset());
+                StreamUtils.skip(contentRepoStream, claim.getOffset() + 
contentClaimOffset);
                 final InputStream bufferedContentStream = new 
BufferedInputStream(contentRepoStream);
-                final ByteCountingInputStream byteCountingInputStream = new 
ByteCountingInputStream(bufferedContentStream, claim.getOffset());
+                final ByteCountingInputStream byteCountingInputStream = new 
ByteCountingInputStream(bufferedContentStream, claim.getOffset() + 
contentClaimOffset);
                 currentReadClaimStream = byteCountingInputStream;
 
                 // Use a non-closeable stream (DisableOnCloseInputStream) 
because we want to keep it open after the callback has finished so that we can
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
index cfb1124..62c8dc2 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.tests.system;
 
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -31,6 +32,7 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.Set;
 
+@SupportsBatching
 public class ReverseContents extends AbstractProcessor {
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
new file mode 100644
index 0000000..93b8e14
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.util.TextLineDemarcator;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class SplitByLine extends AbstractProcessor {
+
+    static final PropertyDescriptor USE_CLONE = new 
PropertyDescriptor.Builder()
+        .name("Use Clone")
+        .description("Whether or not to use session.clone for generating 
children FlowFiles")
+        .required(true)
+        .defaultValue("true")
+        .allowableValues("true", "false")
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.singletonList(USE_CLONE);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final boolean clone = context.getProperty(USE_CLONE).asBoolean();
+        if (clone) {
+            splitByClone(session, flowFile);
+        } else {
+            splitByWrite(session, flowFile);
+        }
+
+        session.remove(flowFile);
+    }
+
+    private void splitByClone(ProcessSession session, FlowFile flowFile) {
+        final List<TextLineDemarcator.OffsetInfo> offsetInfos = new 
ArrayList<>();
+
+        try (final InputStream in = session.read(flowFile);
+             final TextLineDemarcator demarcator = new TextLineDemarcator(in)) 
{
+
+            TextLineDemarcator.OffsetInfo offsetInfo;
+            while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
+                offsetInfos.add(offsetInfo);
+            }
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (final TextLineDemarcator.OffsetInfo offsetInfo : offsetInfos) {
+            FlowFile child = session.clone(flowFile, 
offsetInfo.getStartOffset(), offsetInfo.getLength() - 
offsetInfo.getCrlfLength());
+            session.putAttribute(child, "num.lines", 
String.valueOf(offsetInfos.size()));
+            session.transfer(child, REL_SUCCESS);
+        }
+    }
+
+    private void splitByWrite(ProcessSession session, FlowFile flowFile) {
+        final List<FlowFile> children = new ArrayList<>();
+        try (final InputStream in = session.read(flowFile);
+             final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in))) {
+
+            String line;
+            while ((line = reader.readLine()) != null) {
+                FlowFile child = session.create(flowFile);
+                children.add(child);
+
+                try (final OutputStream out = session.write(child)) {
+                    final byte[] lineBytes = 
line.getBytes(StandardCharsets.UTF_8);
+                    out.write(lineBytes);
+                }
+            }
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (FlowFile child : children) {
+            session.putAttribute(child, "num.lines", 
String.valueOf(children.size()));
+        }
+
+        session.transfer(children, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
new file mode 100644
index 0000000..9454e96
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class VerifyContents extends AbstractProcessor {
+    private static final Relationship REL_UNMATCHED = new 
Relationship.Builder()
+        .name("unmatched")
+        .build();
+
+    private final AtomicReference<Set<Relationship>> relationshipsRef = new 
AtomicReference<>(Collections.singleton(REL_UNMATCHED));
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .dynamic(true)
+            .addValidator(Validator.VALID)
+            .build();
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        final Relationship relationship = new Relationship.Builder()
+            .name(descriptor.getName())
+            .build();
+
+        final Set<Relationship> updatedRelationships = new 
HashSet<>(relationshipsRef.get());
+
+        if (newValue == null) {
+            updatedRelationships.remove(relationship);
+        } else {
+            updatedRelationships.add(relationship);
+        }
+
+        updatedRelationships.add(REL_UNMATCHED); // Ensure that the unmatched 
relationship is always available
+        relationshipsRef.set(updatedRelationships);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String contents;
+        try (final InputStream in = session.read(flowFile);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+            StreamUtils.copy(in, baos);
+            contents = new String(baos.toByteArray(), StandardCharsets.UTF_8);
+        } catch (final Exception e) {
+            throw new ProcessException(e);
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            final String propertyName = entry.getKey().getName();
+            if (contents.equals(entry.getValue())) {
+                getLogger().info("Routing {} to {}", flowFile, propertyName);
+                session.transfer(flowFile, new 
Relationship.Builder().name(propertyName).build());
+                return;
+            }
+        }
+
+        getLogger().info("Routing {} to {}", flowFile, REL_UNMATCHED);
+        session.transfer(flowFile, REL_UNMATCHED);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c6588f6..c0b025c 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -26,7 +26,9 @@ 
org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
 org.apache.nifi.processors.tests.system.ReverseContents
 org.apache.nifi.processors.tests.system.SetAttribute
 org.apache.nifi.processors.tests.system.Sleep
+org.apache.nifi.processors.tests.system.SplitByLine
 org.apache.nifi.processors.tests.system.TerminateFlowFile
 org.apache.nifi.processors.tests.system.ThrowProcessException
 org.apache.nifi.processors.tests.system.ValidateFileExists
+org.apache.nifi.processors.tests.system.VerifyContents
 org.apache.nifi.processors.tests.system.WriteToFile
\ No newline at end of file
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7616a29..feb7bda 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@@ -82,8 +83,10 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -261,6 +264,12 @@ public class NiFiClientUtil {
         return updateProcessorConfig(currentEntity, config);
     }
 
+    public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity 
currentEntity, final int runDuration) throws NiFiClientException, IOException {
+        final ProcessorConfigDTO config = new ProcessorConfigDTO();
+        config.setRunDurationMillis((long) runDuration);
+        return updateProcessorConfig(currentEntity, config);
+    }
+
     public ProcessorEntity updateProcessorSchedulingPeriod(final 
ProcessorEntity currentEntity, final String schedulingPeriod) throws 
NiFiClientException, IOException {
         final ProcessorConfigDTO config = new ProcessorConfigDTO();
         config.setSchedulingPeriod(schedulingPeriod);
@@ -776,6 +785,20 @@ public class NiFiClientUtil {
         return flowFileEntity;
     }
 
+    public String getFlowFileContentAsUtf8(final String connectionId, final 
int flowFileIndex) throws NiFiClientException, IOException {
+        final byte[] contents = getFlowFileContentAsByteArray(connectionId, 
flowFileIndex);
+        return new String(contents, StandardCharsets.UTF_8);
+    }
+
+    public byte[] getFlowFileContentAsByteArray(final String connectionId, 
final int flowFileIndex) throws NiFiClientException, IOException {
+        try (final InputStream in = getFlowFileContent(connectionId, 
flowFileIndex);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+            StreamUtils.copy(in, baos);
+            return baos.toByteArray();
+        }
+    }
+
     public InputStream getFlowFileContent(final String connectionId, final int 
flowFileIndex) throws NiFiClientException, IOException {
         final ListingRequestEntity listing = performQueueListing(connectionId);
         final List<FlowFileSummaryDTO> flowFileSummaries = 
listing.getListingRequest().getFlowFileSummaries();
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
index 3d2507f..c47523f 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
@@ -30,7 +30,6 @@ public class RunOnceIT extends NiFiSystemIT {
 
     @Test
     public void testRunOnce() throws NiFiClientException, IOException, 
InterruptedException {
-        // GIVEN
         ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
         getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
 
@@ -38,10 +37,8 @@ public class RunOnceIT extends NiFiSystemIT {
 
         ConnectionEntity generateToTerminate = 
getClientUtil().createConnection(generate, terminate, "success");
 
-        // WHEN
         getNifiClient().getProcessorClient().runProcessorOnce(generate);
 
-        // THEN
         waitForQueueCount(generateToTerminate.getId(), 1);
 
         ProcessorEntity actualGenerate = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
new file mode 100644
index 0000000..1a3790f
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test is intended to verify that Processors are able to access the 
content that their FlowFiles represent in several different situations.
+ * We test for things like splitting a FlowFile by creating multiple children 
and writing to them, as well as creating children via 
processSession.clone(FlowFile flowFile, long offset, long length);
+ * We also test against Run Duration of 0 ms vs. 25 milliseconds in order to 
test when a processor writes the contents to multiple FlowFiles in the same 
session (which will result in writing to the
+ * same Content Claim) as well as writing to multiple FlowFiles in multiple 
sessions (which may result in writing to multiple Content Claims).
+ */
+public class ContentAccessIT extends NiFiSystemIT {
+
+    @Test
+    public void 
testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndWrite() throws 
NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(true, false);
+    }
+
+    @Test
+    public void 
testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndWrite() throws 
NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(false, false);
+    }
+
+    @Test
+    public void 
testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndClone() throws 
NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(true, true);
+    }
+
+    @Test
+    public void 
testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndClone() throws 
NiFiClientException, IOException, InterruptedException {
+        testCorrectContentReadWhenMultipleFlowFilesInClaim(false, true);
+    }
+
+    public void testCorrectContentReadWhenMultipleFlowFilesInClaim(final 
boolean useBatch, final boolean clone) throws NiFiClientException, IOException, 
InterruptedException {
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity split = 
getClientUtil().createProcessor("SplitByLine");
+        final ProcessorEntity reverse = 
getClientUtil().createProcessor("ReverseContents");
+        final ProcessorEntity verify = 
getClientUtil().createProcessor("VerifyContents");
+        final ProcessorEntity terminateAa = 
getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateBa = 
getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateCa = 
getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminateUnmatched = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Configure Generate
+        getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
+        getClientUtil().updateProcessorProperties(generate, 
Collections.singletonMap("Text", "{ a : a }\n{ a : b }\n{ a : c }"));
+
+        // Configure split
+        getClientUtil().updateProcessorProperties(split, 
Collections.singletonMap("Use Clone", String.valueOf(clone)));
+
+        // Configure Verify
+        final Map<String, String> verifyProperties = new HashMap<>();
+        verifyProperties.put("aa", "} a : a {");
+        verifyProperties.put("ba", "} b : a {");
+        verifyProperties.put("ca", "} c : a {");
+        getClientUtil().updateProcessorProperties(verify, verifyProperties);
+
+        // Configure batching for reverse
+        final int runDuration = useBatch ? 25 : 0;
+        getClientUtil().updateProcessorRunDuration(reverse, runDuration);
+
+        final ConnectionEntity generateToSplit = 
getClientUtil().createConnection(generate, split, "success");
+        final ConnectionEntity splitToReverse = 
getClientUtil().createConnection(split, reverse, "success");
+        final ConnectionEntity reverseToVerify = 
getClientUtil().createConnection(reverse, verify, "success");
+        final ConnectionEntity verifyToTerminateAa = 
getClientUtil().createConnection(verify, terminateAa, "aa");
+        final ConnectionEntity verifyToTerminateBa = 
getClientUtil().createConnection(verify, terminateBa, "ba");
+        final ConnectionEntity verifyToTerminateCa = 
getClientUtil().createConnection(verify, terminateCa, "ca");
+        final ConnectionEntity verifyToTerminateUnmatched = 
getClientUtil().createConnection(verify, terminateAa, "unmatched");
+
+        // Run Generate processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(generate);
+        waitForQueueCount(generateToSplit.getId(), 1);
+
+        // Run split processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(split);
+        waitForQueueCount(splitToReverse.getId(), 3);
+
+        // Verify output of the Split processor
+        final String firstSplitContents = 
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 0);
+        final String secondSplitContents = 
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 1);
+        final String thirdSplitContents = 
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 2);
+
+        // Verify that we get both expected outputs. We put them in a set and 
ensure that the set contains both because we don't know the order
+        // that they will be in. The reason we don't know the order is because 
if we are using batching, the contents will be in the same output
+        // Content Claim, otherwise they won't be. If they are not, the order 
can change.
+        final Set<String> splitContents = new HashSet<>();
+        splitContents.add(firstSplitContents);
+        splitContents.add(secondSplitContents);
+        splitContents.add(thirdSplitContents);
+
+        assertTrue(splitContents.contains("{ a : a }"));
+        assertTrue(splitContents.contains("{ a : b }"));
+        assertTrue(splitContents.contains("{ a : c }"));
+
+        // Start the reverse processor, wait for its output
+        getNifiClient().getProcessorClient().startProcessor(reverse);
+        waitForQueueCount(reverseToVerify.getId(), 3);
+
+        final String firstReversedContents = 
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
+        final String secondReversedContents = 
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 1);
+        final String thirdReversedContents = 
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 2);
+
+        final Set<String> reversedContents = new HashSet<>();
+        reversedContents.add(firstReversedContents);
+        reversedContents.add(secondReversedContents);
+        reversedContents.add(thirdReversedContents);
+
+        assertTrue(reversedContents.contains("} a : a {"));
+        assertTrue(reversedContents.contains("} b : a {"));
+        assertTrue(reversedContents.contains("} c : a {"));
+
+        // Start verify processor. This is different than verify the contents 
above because doing so above is handled by making a REST call, which does not 
make use
+        // of the ProcessSession. Using the VerifyContents processor ensures 
that the Processors see the same contents.
+        getNifiClient().getProcessorClient().startProcessor(verify);
+
+        waitForQueueCount(verifyToTerminateAa.getId(), 1);
+        waitForQueueCount(verifyToTerminateBa.getId(), 1);
+        waitForQueueCount(verifyToTerminateCa.getId(), 1);
+        waitForQueueCount(verifyToTerminateUnmatched.getId(), 0);
+    }
+}

Reply via email to