[ https://issues.apache.org/jira/browse/NIFI-5879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mark Payne updated NIFI-5879: ----------------------------- Status: Patch Available (was: Open) > ContentNotFoundException thrown if a FlowFile's content claim is read, then > written to, then read again, within the same ProcessSession > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: NIFI-5879 > URL: https://issues.apache.org/jira/browse/NIFI-5879 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Priority: Major > Fix For: 1.9.0 > > > The following Processor can be used to replicate the issue. > If a processor reads content, then attempts to write to the content, then > read what was just written, a ContentNotFoundException will be thrown. > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.nifi.processors.standard; > import org.apache.nifi.components.PropertyDescriptor; > import org.apache.nifi.components.PropertyDescriptor.Builder; > import org.apache.nifi.flowfile.FlowFile; > import org.apache.nifi.processor.AbstractProcessor; > import org.apache.nifi.processor.ProcessContext; > import org.apache.nifi.processor.ProcessSession; > import org.apache.nifi.processor.Relationship; > import org.apache.nifi.processor.exception.ProcessException; > import org.apache.nifi.stream.io.StreamUtils; > import java.io.IOException; > import java.io.InputStream; > import java.util.ArrayList; > import java.util.Collections; > import java.util.List; > import java.util.Set; > import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; > import static > org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR; > public class ReplicateWeirdness extends AbstractProcessor { > static final PropertyDescriptor CLONE_ITERATIONS = new Builder() > .name("Iterations") > .displayName("Iterations") > .description("Number of Iterations") > .required(true) > .addValidator(POSITIVE_INTEGER_VALIDATOR) > .expressionLanguageSupported(NONE) > .defaultValue("1") > .build(); > static final PropertyDescriptor WRITE_ITERATIONS = new Builder() > .name("Write Iterations") > .displayName("Write Iterations") > .description("Write Iterations") > .required(true) > .addValidator(POSITIVE_INTEGER_VALIDATOR) > .expressionLanguageSupported(NONE) > .defaultValue("2") > .build(); > static final PropertyDescriptor READ_FIRST = new Builder() > .name("Read First") > .displayName("Read First") > .description("Read First") > .required(true) > .allowableValues("true", "false") > .expressionLanguageSupported(NONE) > .defaultValue("false") > .build(); > static final Relationship REL_SUCCESS = new Relationship.Builder() > .name("success") > .build(); > @Override > public Set<Relationship> getRelationships() { > return Collections.singleton(REL_SUCCESS); > } > @Override > protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { > final List<PropertyDescriptor> properties = new ArrayList<>(); > properties.add(CLONE_ITERATIONS); > properties.add(WRITE_ITERATIONS); > properties.add(READ_FIRST); > return properties; > } > @Override > public void onTrigger(final ProcessContext context, final ProcessSession > session) throws ProcessException { > FlowFile original = session.get(); > if (original == null) { > return; > } > try (final InputStream in = session.read(original)) { > final long originalLength = countBytes(in); > getLogger().info("Original FlowFile is " + originalLength + " bytes"); > } catch (final IOException e) { > throw new ProcessException(e); > } > final int cloneIterations = > context.getProperty(CLONE_ITERATIONS).asInteger(); > final int writeIterations = > context.getProperty(WRITE_ITERATIONS).asInteger(); > final boolean readFirst = context.getProperty(READ_FIRST).asBoolean(); > for (int i=0; i < cloneIterations; i++) { > FlowFile clone = session.clone(original); > for (int w = 0; w < writeIterations; w++) { > if (readFirst) { > try (InputStream in = session.read(clone)) { > final long len = countBytes(in); > getLogger().info("Read " + len + " bytes"); > } catch (IOException e) { > throw new ProcessException(e); > } > } > clone = session.write(clone, out -> out.write("boom".getBytes())); > clone = session.write(clone, StreamUtils::copy); > } > session.transfer(clone, REL_SUCCESS); > } > session.transfer(original, REL_SUCCESS); > } > private long countBytes(final InputStream in) throws IOException { > int len = 0; > while (in.read() >= 0) { > len++; > } > return len; > } > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)