This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new cf4e966 NIFI-8337: This closes #4910. Fixed bug in
StandardProcessSession where the session didn't account for FlowFile's
contentClaimOffset when seeking to the appropriate location in the stream.
cf4e966 is described below
commit cf4e966d912975dfeac1cbf80f9e073536405897
Author: Mark Payne <[email protected]>
AuthorDate: Wed Mar 17 13:30:38 2021 -0400
NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the
session didn't account for FlowFile's contentClaimOffset when seeking to the
appropriate location in the stream.
Signed-off-by: Joe Witt <[email protected]>
---
.../repository/StandardProcessSession.java | 4 +-
.../processors/tests/system/ReverseContents.java | 2 +
.../nifi/processors/tests/system/SplitByLine.java | 126 +++++++++++++++++
.../processors/tests/system/VerifyContents.java | 102 ++++++++++++++
.../services/org.apache.nifi.processor.Processor | 2 +
.../apache/nifi/tests/system/NiFiClientUtil.java | 23 ++++
.../nifi/tests/system/processor/RunOnceIT.java | 3 -
.../tests/system/repositories/ContentAccessIT.java | 150 +++++++++++++++++++++
8 files changed, 407 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 1d24f33..b2b1bde 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2300,9 +2300,9 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
currentReadClaim = claim.getResourceClaim();
final InputStream contentRepoStream =
context.getContentRepository().read(claim.getResourceClaim());
- StreamUtils.skip(contentRepoStream, claim.getOffset());
+ StreamUtils.skip(contentRepoStream, claim.getOffset() +
contentClaimOffset);
final InputStream bufferedContentStream = new
BufferedInputStream(contentRepoStream);
- final ByteCountingInputStream byteCountingInputStream = new
ByteCountingInputStream(bufferedContentStream, claim.getOffset());
+ final ByteCountingInputStream byteCountingInputStream = new
ByteCountingInputStream(bufferedContentStream, claim.getOffset() +
contentClaimOffset);
currentReadClaimStream = byteCountingInputStream;
// Use a non-closeable stream (DisableOnCloseInputStream)
because we want to keep it open after the callback has finished so that we can
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
index cfb1124..62c8dc2 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.tests.system;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -31,6 +32,7 @@ import java.io.OutputStream;
import java.util.Collections;
import java.util.Set;
+@SupportsBatching
public class ReverseContents extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
new file mode 100644
index 0000000..93b8e14
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.util.TextLineDemarcator;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class SplitByLine extends AbstractProcessor {
+
+ static final PropertyDescriptor USE_CLONE = new
PropertyDescriptor.Builder()
+ .name("Use Clone")
+ .description("Whether or not to use session.clone for generating
children FlowFiles")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.singletonList(USE_CLONE);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final boolean clone = context.getProperty(USE_CLONE).asBoolean();
+ if (clone) {
+ splitByClone(session, flowFile);
+ } else {
+ splitByWrite(session, flowFile);
+ }
+
+ session.remove(flowFile);
+ }
+
+ private void splitByClone(ProcessSession session, FlowFile flowFile) {
+ final List<TextLineDemarcator.OffsetInfo> offsetInfos = new
ArrayList<>();
+
+ try (final InputStream in = session.read(flowFile);
+ final TextLineDemarcator demarcator = new TextLineDemarcator(in))
{
+
+ TextLineDemarcator.OffsetInfo offsetInfo;
+ while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
+ offsetInfos.add(offsetInfo);
+ }
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ for (final TextLineDemarcator.OffsetInfo offsetInfo : offsetInfos) {
+ FlowFile child = session.clone(flowFile,
offsetInfo.getStartOffset(), offsetInfo.getLength() -
offsetInfo.getCrlfLength());
+ session.putAttribute(child, "num.lines",
String.valueOf(offsetInfos.size()));
+ session.transfer(child, REL_SUCCESS);
+ }
+ }
+
+ private void splitByWrite(ProcessSession session, FlowFile flowFile) {
+ final List<FlowFile> children = new ArrayList<>();
+ try (final InputStream in = session.read(flowFile);
+ final BufferedReader reader = new BufferedReader(new
InputStreamReader(in))) {
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ FlowFile child = session.create(flowFile);
+ children.add(child);
+
+ try (final OutputStream out = session.write(child)) {
+ final byte[] lineBytes =
line.getBytes(StandardCharsets.UTF_8);
+ out.write(lineBytes);
+ }
+ }
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ for (FlowFile child : children) {
+ session.putAttribute(child, "num.lines",
String.valueOf(children.size()));
+ }
+
+ session.transfer(children, REL_SUCCESS);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
new file mode 100644
index 0000000..9454e96
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class VerifyContents extends AbstractProcessor {
+ private static final Relationship REL_UNMATCHED = new
Relationship.Builder()
+ .name("unmatched")
+ .build();
+
+ private final AtomicReference<Set<Relationship>> relationshipsRef = new
AtomicReference<>(Collections.singleton(REL_UNMATCHED));
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .dynamic(true)
+ .addValidator(Validator.VALID)
+ .build();
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ final Relationship relationship = new Relationship.Builder()
+ .name(descriptor.getName())
+ .build();
+
+ final Set<Relationship> updatedRelationships = new
HashSet<>(relationshipsRef.get());
+
+ if (newValue == null) {
+ updatedRelationships.remove(relationship);
+ } else {
+ updatedRelationships.add(relationship);
+ }
+
+ updatedRelationships.add(REL_UNMATCHED); // Ensure that the unmatched
relationship is always available
+ relationshipsRef.set(updatedRelationships);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String contents;
+ try (final InputStream in = session.read(flowFile);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+ StreamUtils.copy(in, baos);
+ contents = new String(baos.toByteArray(), StandardCharsets.UTF_8);
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ final String propertyName = entry.getKey().getName();
+ if (contents.equals(entry.getValue())) {
+ getLogger().info("Routing {} to {}", flowFile, propertyName);
+ session.transfer(flowFile, new
Relationship.Builder().name(propertyName).build());
+ return;
+ }
+ }
+
+ getLogger().info("Routing {} to {}", flowFile, REL_UNMATCHED);
+ session.transfer(flowFile, REL_UNMATCHED);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c6588f6..c0b025c 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -26,7 +26,9 @@
org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
org.apache.nifi.processors.tests.system.ReverseContents
org.apache.nifi.processors.tests.system.SetAttribute
org.apache.nifi.processors.tests.system.Sleep
+org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.ValidateFileExists
+org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteToFile
\ No newline at end of file
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 7616a29..feb7bda 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@@ -82,8 +83,10 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -261,6 +264,12 @@ public class NiFiClientUtil {
return updateProcessorConfig(currentEntity, config);
}
+ public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity
currentEntity, final int runDuration) throws NiFiClientException, IOException {
+ final ProcessorConfigDTO config = new ProcessorConfigDTO();
+ config.setRunDurationMillis((long) runDuration);
+ return updateProcessorConfig(currentEntity, config);
+ }
+
public ProcessorEntity updateProcessorSchedulingPeriod(final
ProcessorEntity currentEntity, final String schedulingPeriod) throws
NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setSchedulingPeriod(schedulingPeriod);
@@ -776,6 +785,20 @@ public class NiFiClientUtil {
return flowFileEntity;
}
+ public String getFlowFileContentAsUtf8(final String connectionId, final
int flowFileIndex) throws NiFiClientException, IOException {
+ final byte[] contents = getFlowFileContentAsByteArray(connectionId,
flowFileIndex);
+ return new String(contents, StandardCharsets.UTF_8);
+ }
+
+ public byte[] getFlowFileContentAsByteArray(final String connectionId,
final int flowFileIndex) throws NiFiClientException, IOException {
+ try (final InputStream in = getFlowFileContent(connectionId,
flowFileIndex);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+
+ StreamUtils.copy(in, baos);
+ return baos.toByteArray();
+ }
+ }
+
public InputStream getFlowFileContent(final String connectionId, final int
flowFileIndex) throws NiFiClientException, IOException {
final ListingRequestEntity listing = performQueueListing(connectionId);
final List<FlowFileSummaryDTO> flowFileSummaries =
listing.getListingRequest().getFlowFileSummaries();
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
index 3d2507f..c47523f 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
@@ -30,7 +30,6 @@ public class RunOnceIT extends NiFiSystemIT {
@Test
public void testRunOnce() throws NiFiClientException, IOException,
InterruptedException {
- // GIVEN
ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
@@ -38,10 +37,8 @@ public class RunOnceIT extends NiFiSystemIT {
ConnectionEntity generateToTerminate =
getClientUtil().createConnection(generate, terminate, "success");
- // WHEN
getNifiClient().getProcessorClient().runProcessorOnce(generate);
- // THEN
waitForQueueCount(generateToTerminate.getId(), 1);
ProcessorEntity actualGenerate =
getNifiClient().getProcessorClient().getProcessor(generate.getId());
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
new file mode 100644
index 0000000..1a3790f
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test is intended to verify that Processors are able to access the
content that their FlowFiles represent in several different situations.
+ * We test for things like splitting a FlowFile by creating multiple children
and writing to them, as well as creating children via
processSession.clone(FlowFile flowFile, long offset, long length);
+ * We also test against Run Duration of 0 ms vs. 25 milliseconds in order to
test when a processor writes the contents to multiple FlowFiles in the same
session (which will result in writing to the
+ * same Content Claim) as well as writing to multiple FlowFiles in multiple
sessions (which may result in writing to multiple Content Claims).
+ */
+public class ContentAccessIT extends NiFiSystemIT {
+
+ @Test
+ public void
testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndWrite() throws
NiFiClientException, IOException, InterruptedException {
+ testCorrectContentReadWhenMultipleFlowFilesInClaim(true, false);
+ }
+
+ @Test
+ public void
testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndWrite() throws
NiFiClientException, IOException, InterruptedException {
+ testCorrectContentReadWhenMultipleFlowFilesInClaim(false, false);
+ }
+
+ @Test
+ public void
testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndClone() throws
NiFiClientException, IOException, InterruptedException {
+ testCorrectContentReadWhenMultipleFlowFilesInClaim(true, true);
+ }
+
+ @Test
+ public void
testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndClone() throws
NiFiClientException, IOException, InterruptedException {
+ testCorrectContentReadWhenMultipleFlowFilesInClaim(false, true);
+ }
+
+ public void testCorrectContentReadWhenMultipleFlowFilesInClaim(final
boolean useBatch, final boolean clone) throws NiFiClientException, IOException,
InterruptedException {
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ final ProcessorEntity split =
getClientUtil().createProcessor("SplitByLine");
+ final ProcessorEntity reverse =
getClientUtil().createProcessor("ReverseContents");
+ final ProcessorEntity verify =
getClientUtil().createProcessor("VerifyContents");
+ final ProcessorEntity terminateAa =
getClientUtil().createProcessor("TerminateFlowFile");
+ final ProcessorEntity terminateBa =
getClientUtil().createProcessor("TerminateFlowFile");
+ final ProcessorEntity terminateCa =
getClientUtil().createProcessor("TerminateFlowFile");
+ final ProcessorEntity terminateUnmatched =
getClientUtil().createProcessor("TerminateFlowFile");
+
+ // Configure Generate
+ getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
+ getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("Text", "{ a : a }\n{ a : b }\n{ a : c }"));
+
+ // Configure split
+ getClientUtil().updateProcessorProperties(split,
Collections.singletonMap("Use Clone", String.valueOf(clone)));
+
+ // Configure Verify
+ final Map<String, String> verifyProperties = new HashMap<>();
+ verifyProperties.put("aa", "} a : a {");
+ verifyProperties.put("ba", "} b : a {");
+ verifyProperties.put("ca", "} c : a {");
+ getClientUtil().updateProcessorProperties(verify, verifyProperties);
+
+ // Configure batching for reverse
+ final int runDuration = useBatch ? 25 : 0;
+ getClientUtil().updateProcessorRunDuration(reverse, runDuration);
+
+ final ConnectionEntity generateToSplit =
getClientUtil().createConnection(generate, split, "success");
+ final ConnectionEntity splitToReverse =
getClientUtil().createConnection(split, reverse, "success");
+ final ConnectionEntity reverseToVerify =
getClientUtil().createConnection(reverse, verify, "success");
+ final ConnectionEntity verifyToTerminateAa =
getClientUtil().createConnection(verify, terminateAa, "aa");
+ final ConnectionEntity verifyToTerminateBa =
getClientUtil().createConnection(verify, terminateBa, "ba");
+ final ConnectionEntity verifyToTerminateCa =
getClientUtil().createConnection(verify, terminateCa, "ca");
+ final ConnectionEntity verifyToTerminateUnmatched =
getClientUtil().createConnection(verify, terminateAa, "unmatched");
+
+ // Run Generate processor, wait for its output
+ getNifiClient().getProcessorClient().startProcessor(generate);
+ waitForQueueCount(generateToSplit.getId(), 1);
+
+ // Run split processor, wait for its output
+ getNifiClient().getProcessorClient().startProcessor(split);
+ waitForQueueCount(splitToReverse.getId(), 3);
+
+ // Verify output of the Split processor
+ final String firstSplitContents =
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 0);
+ final String secondSplitContents =
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 1);
+ final String thirdSplitContents =
getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 2);
+
+ // Verify that we get both expected outputs. We put them in a set and
ensure that the set contains both because we don't know the order
+ // that they will be in. The reason we don't know the order is because
if we are using batching, the contents will be in the same output
+ // Content Claim, otherwise they won't be. If they are not, the order
can change.
+ final Set<String> splitContents = new HashSet<>();
+ splitContents.add(firstSplitContents);
+ splitContents.add(secondSplitContents);
+ splitContents.add(thirdSplitContents);
+
+ assertTrue(splitContents.contains("{ a : a }"));
+ assertTrue(splitContents.contains("{ a : b }"));
+ assertTrue(splitContents.contains("{ a : c }"));
+
+ // Start the reverse processor, wait for its output
+ getNifiClient().getProcessorClient().startProcessor(reverse);
+ waitForQueueCount(reverseToVerify.getId(), 3);
+
+ final String firstReversedContents =
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
+ final String secondReversedContents =
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 1);
+ final String thirdReversedContents =
getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 2);
+
+ final Set<String> reversedContents = new HashSet<>();
+ reversedContents.add(firstReversedContents);
+ reversedContents.add(secondReversedContents);
+ reversedContents.add(thirdReversedContents);
+
+ assertTrue(reversedContents.contains("} a : a {"));
+ assertTrue(reversedContents.contains("} b : a {"));
+ assertTrue(reversedContents.contains("} c : a {"));
+
+ // Start verify processor. This is different than verify the contents
above because doing so above is handled by making a REST call, which does not
make use
+ // of the ProcessSession. Using the VerifyContents processor ensures
that the Processors see the same contents.
+ getNifiClient().getProcessorClient().startProcessor(verify);
+
+ waitForQueueCount(verifyToTerminateAa.getId(), 1);
+ waitForQueueCount(verifyToTerminateBa.getId(), 1);
+ waitForQueueCount(verifyToTerminateCa.getId(), 1);
+ waitForQueueCount(verifyToTerminateUnmatched.getId(), 0);
+ }
+}