http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index 0000000,6e5f65d..15591d7 mode 000000,100644..100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@@ -1,0 -1,261 +1,284 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.util; + + import static java.util.Objects.requireNonNull; + + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; ++import java.util.HashSet; + import java.util.LinkedHashMap; + import java.util.List; + import java.util.Map; + import java.util.Objects; ++import java.util.Set; + + import org.apache.nifi.components.ConfigurableComponent; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.ControllerServiceLookup; ++import org.apache.nifi.processor.Processor; ++import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.SchedulingContext; + import org.junit.Assert; + + public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup { + + private final ConfigurableComponent component; + private final Map<PropertyDescriptor, String> properties = new HashMap<>(); + + private String annotationData = null; + private boolean yieldCalled = false; + private boolean enableExpressionValidation = false; + private boolean allowExpressionValidation = true; + ++ private volatile Set<Relationship> unavailableRelationships = new HashSet<>(); ++ + /** + * Creates a new MockProcessContext for the given Processor + * + * @param component + */ + public MockProcessContext(final ConfigurableComponent component) { + this.component = Objects.requireNonNull(component); + } + + public MockProcessContext(final ControllerService component, final MockProcessContext context) { + this(component); + + try { + annotationData = context.getControllerServiceAnnotationData(component); + final Map<PropertyDescriptor, String> props = context.getControllerServiceProperties(component); + properties.putAll(props); + } catch (IllegalArgumentException e) { + // do nothing...the service is being loaded + } + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor descriptor) { + return getProperty(descriptor.getName()); + } + + @Override + public PropertyValue getProperty(final String propertyName) { + final PropertyDescriptor descriptor = component.getPropertyDescriptor(propertyName); + if (descriptor == null) { + return null; + } + + final String setPropertyValue = properties.get(descriptor); + final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue; + return new MockPropertyValue(propValue, this, (enableExpressionValidation && allowExpressionValidation) ? descriptor : null); + } + + @Override + public PropertyValue newPropertyValue(final String rawValue) { + return new MockPropertyValue(rawValue, this); + } + + public ValidationResult setProperty(final String propertyName, final String propertyValue) { + return setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), propertyValue); + } + + /** + * Updates the value of the property with the given PropertyDescriptor to + * the specified value IF and ONLY IF the value is valid according to the + * descriptor's validator. Otherwise, the property value is not updated. In + * either case, the ValidationResult is returned, indicating whether or not + * the property is valid + * + * @param descriptor + * @param value + * @return + */ + public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) { + requireNonNull(descriptor); + requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead"); + final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName()); + + final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this)); + String oldValue = properties.put(fullyPopulatedDescriptor, value); + if (oldValue == null) { + oldValue = fullyPopulatedDescriptor.getDefaultValue(); + } + if ((value == null && oldValue != null) || (value != null && !value.equals(oldValue))) { + component.onPropertyModified(fullyPopulatedDescriptor, oldValue, value); + } + + return result; + } + + public boolean removeProperty(final PropertyDescriptor descriptor) { + Objects.requireNonNull(descriptor); + final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName()); + String value = null; + if (!fullyPopulatedDescriptor.isRequired() && (value = properties.remove(fullyPopulatedDescriptor)) != null) { + component.onPropertyModified(fullyPopulatedDescriptor, value, null); + return true; + } + return false; + } + + @Override + public void yield() { + yieldCalled = true; + } + + public boolean isYieldCalled() { + return yieldCalled; + } + + public void addControllerService(final String serviceIdentifier, final ControllerService controllerService, final Map<PropertyDescriptor, String> properties, final String annotationData) { + requireNonNull(controllerService); + final ControllerServiceConfiguration config = addControllerService(controllerService); + config.setProperties(properties); + config.setAnnotationData(annotationData); + } + + @Override + public int getMaxConcurrentTasks() { + return 1; + } + + public void setAnnotationData(final String annotationData) { + this.annotationData = annotationData; + } + + @Override + public String getAnnotationData() { + return annotationData; + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + final List<PropertyDescriptor> supported = component.getPropertyDescriptors(); + if (supported == null || supported.isEmpty()) { + return Collections.unmodifiableMap(properties); + } else { + final Map<PropertyDescriptor, String> props = new LinkedHashMap<>(); + for (final PropertyDescriptor descriptor : supported) { + props.put(descriptor, null); + } + props.putAll(properties); + return props; + } + } + + /** + * Validates the current properties, returning ValidationResults for any + * invalid properties. All processor defined properties will be validated. + * If they are not included in the in the purposed configuration, the + * default value will be used. + * + * @return Collection of validation result objects for any invalid findings + * only. If the collection is empty then the processor is valid. Guaranteed + * non-null + */ + public Collection<ValidationResult> validate() { + return component.validate(new MockValidationContext(this)); + } + + public boolean isValid() { + for (final ValidationResult result : validate()) { + if (!result.isValid()) { + return false; + } + } + + return true; + } + + public void assertValid() { + final StringBuilder sb = new StringBuilder(); + int failureCount = 0; + + for (final ValidationResult result : validate()) { + if (!result.isValid()) { + sb.append(result.toString()).append("\n"); + failureCount++; + } + } + + if (failureCount > 0) { + Assert.fail("Processor has " + failureCount + " validation failures:\n" + sb.toString()); + } + } + + @Override + public String encrypt(final String unencrypted) { + return "enc{" + unencrypted + "}"; + } + + @Override + public String decrypt(final String encrypted) { + if (encrypted.startsWith("enc{") && encrypted.endsWith("}")) { + return encrypted.substring(4, encrypted.length() - 2); + } + return encrypted; + } + + public void setValidateExpressionUsage(final boolean validate) { + allowExpressionValidation = validate; + } + + public void enableExpressionValidation() { + enableExpressionValidation = true; + } + + public void disableExpressionValidation() { + enableExpressionValidation = false; + } + + Map<PropertyDescriptor, String> getControllerServiceProperties(final ControllerService controllerService) { + return super.getConfiguration(controllerService.getIdentifier()).getProperties(); + } + + String getControllerServiceAnnotationData(final ControllerService controllerService) { + return super.getConfiguration(controllerService.getIdentifier()).getAnnotationData(); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } + + @Override + public void leaseControllerService(final String identifier) { + } + ++ public Set<Relationship> getAvailableRelationships() { ++ if ( !(component instanceof Processor) ) { ++ return Collections.emptySet(); ++ } ++ ++ final Set<Relationship> relationships = new HashSet<>(((Processor) component).getRelationships()); ++ relationships.removeAll(unavailableRelationships); ++ return relationships; ++ } ++ ++ public void setUnavailableRelationships(final Set<Relationship> relationships) { ++ this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); ++ } ++ ++ public Set<Relationship> getUnavailableRelationships() { ++ return unavailableRelationships; ++ } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 0000000,552780c..ea55b34 mode 000000,100644..100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@@ -1,0 -1,1010 +1,1006 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.util; + + import java.io.ByteArrayInputStream; + import java.io.ByteArrayOutputStream; + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Files; + import java.nio.file.OpenOption; + import java.nio.file.Path; + import java.nio.file.StandardOpenOption; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Objects; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.regex.Pattern; + + import org.junit.Assert; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.processor.FlowFileFilter; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.QueueSize; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.FlowFileAccessException; + import org.apache.nifi.processor.exception.FlowFileHandlingException; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.provenance.ProvenanceReporter; + + public class MockProcessSession implements ProcessSession { + + private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); + private final MockFlowFileQueue processorQueue; + private final Set<Long> beingProcessed = new HashSet<>(); + + private final Map<Long, MockFlowFile> currentVersions = new HashMap<>(); + private final Map<Long, MockFlowFile> originalVersions = new HashMap<>(); + private final SharedSessionState sharedState; + private final Map<String, Long> counterMap = new HashMap<>(); + + private boolean committed = false; + private boolean rolledback = false; + private int removedCount = 0; + + public MockProcessSession(final SharedSessionState sharedState) { + this.sharedState = sharedState; + this.processorQueue = sharedState.getFlowFileQueue(); + } + + @Override + public void adjustCounter(final String name, final long delta, final boolean immediate) { + if (immediate) { + sharedState.adjustCounter(name, delta); + return; + } + + Long counter = counterMap.get(name); + if (counter == null) { + counter = delta; + counterMap.put(name, counter); + return; + } + + counter = counter + delta; + counterMap.put(name, counter); + } + + @Override + public MockFlowFile clone(final FlowFile flowFile) { + validateState(flowFile); + final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) { + validateState(flowFile); + if (offset + size > flowFile.getSize()) { + throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString()); + } + + final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); + final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); + newFlowFile.setData(newContent); + + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public void commit() { + if (!beingProcessed.isEmpty()) { + throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed); + } + committed = true; + beingProcessed.clear(); + currentVersions.clear(); + originalVersions.clear(); + + for (final Map.Entry<String, Long> entry : counterMap.entrySet()) { + sharedState.adjustCounter(entry.getKey(), entry.getValue()); + } + + counterMap.clear(); + } + + /** + * Clear the 'committed' flag so that we can test that the next iteration of + * {@link nifi.processor.Processor#onTrigger} commits or rolls back the + * session + */ + public void clearCommited() { + committed = false; + } + + /** + * Clear the 'rolledBack' flag so that we can test that the next iteration + * of {@link nifi.processor.Processor#onTrigger} commits or rolls back the + * session + */ + public void clearRollback() { + rolledback = false; + } + + @Override + public MockFlowFile create() { + final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); + currentVersions.put(flowFile.getId(), flowFile); + beingProcessed.add(flowFile.getId()); + return flowFile; + } + + @Override + public MockFlowFile create(final FlowFile flowFile) { + MockFlowFile newFlowFile = create(); + newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public MockFlowFile create(final Collection<FlowFile> flowFiles) { + MockFlowFile newFlowFile = create(); + newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public void exportTo(final FlowFile flowFile, final OutputStream out) { + validateState(flowFile); + if (flowFile == null || out == null) { + throw new IllegalArgumentException("arguments cannot be null"); + } + + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + + try { + out.write(mock.getData()); + } catch (IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public void exportTo(final FlowFile flowFile, final Path path, final boolean append) { + validateState(flowFile); + if (flowFile == null || path == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + + final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE; + + try (final OutputStream out = Files.newOutputStream(path, mode)) { + out.write(mock.getData()); + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public MockFlowFile get() { + final MockFlowFile flowFile = processorQueue.poll(); + if (flowFile != null) { + beingProcessed.add(flowFile.getId()); + currentVersions.put(flowFile.getId(), flowFile); + originalVersions.put(flowFile.getId(), flowFile); + } + return flowFile; + } + + @Override + public List<FlowFile> get(final int maxResults) { + final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults)); + for (int i = 0; i < maxResults; i++) { + final MockFlowFile nextFlowFile = get(); + if (nextFlowFile == null) { + return flowFiles; + } + + flowFiles.add(nextFlowFile); + } + + return flowFiles; + } + + @Override + public List<FlowFile> get(final FlowFileFilter filter) { + final List<FlowFile> flowFiles = new ArrayList<>(); + final List<MockFlowFile> unselected = new ArrayList<>(); + + while (true) { + final MockFlowFile flowFile = processorQueue.poll(); + if (flowFile == null) { + break; + } + + final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile); + if (filterResult.isAccept()) { + flowFiles.add(flowFile); + + beingProcessed.add(flowFile.getId()); + currentVersions.put(flowFile.getId(), flowFile); + originalVersions.put(flowFile.getId(), flowFile); + } else { + unselected.add(flowFile); + } + + if (!filterResult.isContinue()) { + break; + } + } + + processorQueue.addAll(unselected); + return flowFiles; + } + + @Override + public QueueSize getQueueSize() { + return processorQueue.size(); + } + + @Override + public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) { + validateState(flowFile); + if (in == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + try { + final byte[] data = readFully(in); + newFlowFile.setData(data); + return newFlowFile; + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) { + validateState(flowFile); + if (path == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + Files.copy(path, baos); + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + + newFlowFile.setData(baos.toByteArray()); + newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString()); + return newFlowFile; + } + - @Override - public Set<Relationship> getAvailableRelationships() { - return sharedState.getAvailableRelationships(); - } + + @Override + public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { + for (final FlowFile source : sources) { + validateState(source); + } + validateState(destination); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (final FlowFile flowFile : sources) { + final MockFlowFile mock = (MockFlowFile) flowFile; + final byte[] data = mock.getData(); + try { + baos.write(data); + } catch (final IOException e) { + throw new AssertionError("Failed to write to BAOS"); + } + } + + final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); + newFlowFile.setData(baos.toByteArray()); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + return newFlowFile; + } + + @Override + public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attrs) { + validateState(flowFile); + if (attrs == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.putAttributes(attrs); + return newFlowFile; + } + + @Override + public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) { + validateState(flowFile); + if (attrName == null || attrValue == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); + } + + if ("uuid".equals(attrName)) { + Assert.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put(attrName, attrValue); + newFlowFile.putAttributes(attrs); + return newFlowFile; + } + + @Override + public void read(final FlowFile flowFile, final InputStreamCallback callback) { + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + + validateState(flowFile); + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); + try { + callback.process(bais); + } catch (IOException e) { + throw new ProcessException(e.toString(), e); + } + } + + @Override + public void remove(final FlowFile flowFile) { + validateState(flowFile); + final Iterator<Long> itr = beingProcessed.iterator(); + while (itr.hasNext()) { + final Long ffId = itr.next(); + if (ffId != null && ffId.equals(flowFile.getId())) { + itr.remove(); + beingProcessed.remove(ffId); + removedCount++; + currentVersions.remove(ffId); + return; + } + } + + throw new ProcessException(flowFile + " not found in queue"); + } + + @Override + public void remove(final Collection<FlowFile> flowFiles) { + for (final FlowFile flowFile : flowFiles) { + validateState(flowFile); + } + + for (final FlowFile flowFile : flowFiles) { + remove(flowFile); + } + } + + @Override + public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> attrNames) { + validateState(flowFile); + if (attrNames == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.removeAttributes(attrNames); + return newFlowFile; + } + + @Override + public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) { + validateState(flowFile); + if (flowFile == null) { + throw new IllegalArgumentException("flowFile cannot be null"); + } + if (keyPattern == null) { + return (MockFlowFile) flowFile; + } + + final Set<String> attrsToRemove = new HashSet<>(); + for (final String key : flowFile.getAttributes().keySet()) { + if (keyPattern.matcher(key).matches()) { + attrsToRemove.add(key); + } + } + + return removeAllAttributes(flowFile, attrsToRemove); + } + + @Override + public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) { + validateState(flowFile); + if (attrName == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final Set<String> attrNames = new HashSet<>(); + attrNames.add(attrName); + newFlowFile.removeAttributes(attrNames); + return newFlowFile; + } + + @Override + public void rollback() { + rollback(false); + } + + @Override + public void rollback(final boolean penalize) { + for (final List<MockFlowFile> list : transferMap.values()) { + for (final MockFlowFile flowFile : list) { + processorQueue.offer(flowFile); + } + } + + for (final Long flowFileId : beingProcessed) { + final MockFlowFile flowFile = originalVersions.get(flowFileId); + if (flowFile != null) { + processorQueue.offer(flowFile); + } + } + + rolledback = true; + beingProcessed.clear(); + currentVersions.clear(); + originalVersions.clear(); + transferMap.clear(); + clearTransferState(); + } + + @Override + public void transfer(final FlowFile flowFile) { + validateState(flowFile); + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("I only accept MockFlowFile"); + } + + beingProcessed.remove(flowFile.getId()); + processorQueue.offer((MockFlowFile) flowFile); + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles) { + for (final FlowFile flowFile : flowFiles) { + transfer(flowFile); + } + } + + @Override + public void transfer(final FlowFile flowFile, final Relationship relationship) { + validateState(flowFile); + List<MockFlowFile> list = transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + transferMap.put(relationship, list); + } + + beingProcessed.remove(flowFile.getId()); + list.add((MockFlowFile) flowFile); + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) { + for (final FlowFile flowFile : flowFiles) { + validateState(flowFile); + } + + List<MockFlowFile> list = transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + transferMap.put(relationship, list); + } + + for (final FlowFile flowFile : flowFiles) { + beingProcessed.remove(flowFile.getId()); + list.add((MockFlowFile) flowFile); + } + } + + @Override + public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + callback.process(baos); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.setData(baos.toByteArray()); + return newFlowFile; + } + + @Override + public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + baos.write(mock.getData()); + callback.process(baos); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.setData(baos.toByteArray()); + return newFlowFile; + } + + @Override + public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData()); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + callback.process(in, out); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + newFlowFile.setData(out.toByteArray()); + + return newFlowFile; + } + + private byte[] readFully(final InputStream in) throws IOException { + final byte[] buffer = new byte[4096]; + int len; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + while ((len = in.read(buffer)) >= 0) { + baos.write(buffer, 0, len); + } + + return baos.toByteArray(); + } + + public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) { + List<MockFlowFile> list = this.transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + } + + return list; + } + + /** + * Returns a List of FlowFiles in the order in which they were transferred + * to the given relationship + * + * @param relationship + * @return + */ + public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { + final Relationship procRel = new Relationship.Builder().name(relationship).build(); + return getFlowFilesForRelationship(procRel); + } + + public MockFlowFile createFlowFile(final File file) throws IOException { + return createFlowFile(Files.readAllBytes(file.toPath())); + } + + public MockFlowFile createFlowFile(final byte[] data) { + final MockFlowFile flowFile = create(); + flowFile.setData(data); + return flowFile; + } + + public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) { + final MockFlowFile ff = createFlowFile(data); + ff.putAttributes(attrs); + return ff; + } + + @Override + public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) { + for (final FlowFile flowFile : sources) { + validateState(flowFile); + } + validateState(destination); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + if (header != null) { + baos.write(header); + } + + int count = 0; + for (final FlowFile flowFile : sources) { + baos.write(((MockFlowFile) flowFile).getData()); + if (demarcator != null && ++count != sources.size()) { + baos.write(demarcator); + } + } + + if (footer != null) { + baos.write(footer); + } + } catch (final IOException e) { + throw new AssertionError("failed to write data to BAOS"); + } + + final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); + newFlowFile.setData(baos.toByteArray()); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + return newFlowFile; + } + + private void validateState(final FlowFile flowFile) { + Objects.requireNonNull(flowFile); + final FlowFile currentVersion = currentVersions.get(flowFile.getId()); + if (currentVersion == null) { + throw new FlowFileHandlingException(flowFile + " is not known in this session"); + } + + if (currentVersion != flowFile) { + throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session"); + } + + for (final List<MockFlowFile> flowFiles : transferMap.values()) { + if (flowFiles.contains(flowFile)) { + throw new IllegalStateException(flowFile + " has already been transferred"); + } + } + } + + /** + * Inherits the attributes from the given source flow file into another flow + * file. The UUID of the source becomes the parent UUID of this flow file. + * If a parent uuid had previously been established it will be replaced by + * the uuid of the given source + * + * @param source the FlowFile from which to copy attributes + * @param destination the FlowFile to which to copy attributes + */ + private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) { + if (source == null || destination == null || source == destination) { + return destination; //don't need to inherit from ourselves + } + FlowFile updated = putAllAttributes(destination, source.getAttributes()); + getProvenanceReporter().fork(source, Collections.singletonList(updated)); + return updated; + } + + /** + * Inherits the attributes from the given source flow files into the + * destination flow file. The UUIDs of the sources becomes the parent UUIDs + * of the destination flow file. Only attributes which is common to all + * source items is copied into this flow files attributes. Any previously + * established parent UUIDs will be replaced by the UUIDs of the sources. It + * will capture the uuid of a certain number of source objects and may not + * capture all of them. How many it will capture is unspecified. + * + * @param sources + */ + private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) { + final String uuid = destination.getAttribute(CoreAttributes.UUID.key()); + final StringBuilder parentUuidBuilder = new StringBuilder(); + int uuidsCaptured = 0; + for (final FlowFile source : sources) { + if (source == destination) { + continue; //don't want to capture parent uuid of this. Something can't be a child of itself + } + final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key()); + if (sourceUuid != null && !sourceUuid.trim().isEmpty()) { + uuidsCaptured++; + if (parentUuidBuilder.length() > 0) { + parentUuidBuilder.append(","); + } + parentUuidBuilder.append(sourceUuid); + } + + if (uuidsCaptured > 100) { + break; + } + } + + FlowFile updated = putAllAttributes(destination, intersectAttributes(sources)); + getProvenanceReporter().join(sources, updated); + return updated; + } + + /** + * Returns the attributes that are common to every flow file given. The key + * and value must match exactly. + * + * @param flowFileList a list of flow files + * + * @return the common attributes + */ + private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { + final Map<String, String> result = new HashMap<>(); + //trivial cases + if (flowFileList == null || flowFileList.isEmpty()) { + return result; + } else if (flowFileList.size() == 1) { + result.putAll(flowFileList.iterator().next().getAttributes()); + } + + /* + * Start with the first attribute map and only put an entry to the + * resultant map if it is common to every map. + */ + final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); + + outer: + for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { + final String key = mapEntry.getKey(); + final String value = mapEntry.getValue(); + for (final FlowFile flowFile : flowFileList) { + final Map<String, String> currMap = flowFile.getAttributes(); + final String curVal = currMap.get(key); + if (curVal == null || !curVal.equals(value)) { + continue outer; + } + } + result.put(key, value); + } + + return result; + } + + /** + * Assert that {@link #commit()} has been called + */ + public void assertCommitted() { + Assert.assertTrue("Session was not committed", committed); + } + + /** + * Assert that {@link #commit()} has not been called + */ + public void assertNotCommitted() { + Assert.assertFalse("Session was committed", committed); + } + + /** + * Assert that {@link #rollback()} has been called + */ + public void assertRolledBack() { + Assert.assertTrue("Session was not rolled back", rolledback); + } + + /** + * Assert that {@link #rollback()} has not been called + */ + public void assertNotRolledBack() { + Assert.assertFalse("Session was rolled back", rolledback); + } + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship + * @param count + */ + public void assertTransferCount(final Relationship relationship, final int count) { + final int transferCount = getFlowFilesForRelationship(relationship).size(); + Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to " + + relationship + " but actual transfer count was " + transferCount, count, transferCount); + } + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship + * @param count + */ + public void assertTransferCount(final String relationship, final int count) { + assertTransferCount(new Relationship.Builder().name(relationship).build(), count); + } + + /** + * Assert that there are no FlowFiles left on the input queue. + */ + public void assertQueueEmpty() { + Assert.assertTrue("FlowFile Queue has " + this.processorQueue.size() + " items", this.processorQueue.isEmpty()); + } + + /** + * Assert that at least one FlowFile is on the input queue + */ + public void assertQueueNotEmpty() { + Assert.assertFalse("FlowFile Queue is empty", this.processorQueue.isEmpty()); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship + */ + public void assertAllFlowFilesTransferred(final String relationship) { + assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build()); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship + */ + public void assertAllFlowFilesTransferred(final Relationship relationship) { + for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { + final Relationship rel = entry.getKey(); + final List<MockFlowFile> flowFiles = entry.getValue(); + + if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) { + Assert.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel); + } + } + } + + /** + * Removes all state information about FlowFiles that have been transferred + */ + public void clearTransferState() { + this.transferMap.clear(); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship + * @param count + */ + public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship + * @param count + */ + public void assertAllFlowFilesTransferred(final String relationship, final int count) { + assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count); + } + + /** + * Returns the number of FlowFiles that were removed + * + * @return + */ + public int getRemovedCount() { + return removedCount; + } + + @Override + public ProvenanceReporter getProvenanceReporter() { + return sharedState.getProvenanceReporter(); + } + + @Override + public MockFlowFile penalize(final FlowFile flowFile) { + validateState(flowFile); + final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; + mockFlowFile.setPenalized(); + return mockFlowFile; + } + + public byte[] getContentAsByteArray(final MockFlowFile flowFile) { + validateState(flowFile); + return flowFile.getData(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java ---------------------------------------------------------------------- diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java index 0000000,96bef71..13a87de mode 000000,100644..100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@@ -1,0 -1,91 +1,72 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.util; + -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.atomic.AtomicLong; + + import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; + import org.apache.nifi.provenance.ProvenanceReporter; + + public class SharedSessionState { + + private final MockFlowFileQueue flowFileQueue; + private final ProvenanceReporter provenanceReporter; ++ @SuppressWarnings("unused") + private final Processor processor; + private final AtomicLong flowFileIdGenerator; + private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); + - private volatile Set<Relationship> unavailableRelationships; + + public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { + flowFileQueue = new MockFlowFileQueue(); + provenanceReporter = new MockProvenanceReporter(); - unavailableRelationships = new HashSet<>(); + this.flowFileIdGenerator = flowFileIdGenerator; + this.processor = processor; + } + - public Set<Relationship> getAvailableRelationships() { - final Set<Relationship> relationships = new HashSet<>(processor.getRelationships()); - relationships.removeAll(unavailableRelationships); - return relationships; - } - - public void setUnavailableRelationships(final Set<Relationship> relationships) { - this.unavailableRelationships = Collections.unmodifiableSet(new HashSet<>(relationships)); - } - - public Set<Relationship> getUnavailableRelationships() { - return unavailableRelationships; - } - + public MockFlowFileQueue getFlowFileQueue() { + return flowFileQueue; + } + + public ProvenanceReporter getProvenanceReporter() { + return provenanceReporter; + } + + public long nextFlowFileId() { + return flowFileIdGenerator.getAndIncrement(); + } + + public void adjustCounter(final String name, final long delta) { + AtomicLong counter = counterMap.get(name); + if (counter == null) { + counter = new AtomicLong(0L); + AtomicLong existingCounter = counterMap.putIfAbsent(name, counter); + if (existingCounter != null) { + counter = existingCounter; + } + } + + counter.addAndGet(delta); + } + + public Long getCounterValue(final String name) { + final AtomicLong counterValue = counterMap.get(name); + return (counterValue == null) ? null : counterValue.get(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --cc nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 0000000,54b611d..40d5035 mode 000000,100644..100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@@ -1,0 -1,492 +1,492 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.util; + + import static java.util.Objects.requireNonNull; + + import java.io.ByteArrayInputStream; + import java.io.IOException; + import java.io.InputStream; + import java.lang.reflect.InvocationTargetException; + import java.nio.file.Files; + import java.nio.file.Path; + import java.util.ArrayList; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.atomic.AtomicLong; + + import org.apache.nifi.components.AllowableValue; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.controller.ControllerService; + import org.apache.nifi.controller.annotation.OnConfigured; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.Processor; + import org.apache.nifi.processor.QueueSize; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.annotation.OnAdded; + import org.apache.nifi.processor.annotation.OnScheduled; + import org.apache.nifi.processor.annotation.OnShutdown; + import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.annotation.OnUnscheduled; + import org.apache.nifi.processor.annotation.TriggerSerially; + import org.apache.nifi.provenance.ProvenanceReporter; + import org.apache.nifi.reporting.InitializationException; + + import org.junit.Assert; + + public class StandardProcessorTestRunner implements TestRunner { + + private final Processor processor; + private final MockProcessContext context; + private final MockFlowFileQueue flowFileQueue; + private final MockSessionFactory sessionFactory; + private final SharedSessionState sharedState; + private final AtomicLong idGenerator; + private final boolean triggerSerially; + + private int numThreads = 1; + private final AtomicInteger invocations = new AtomicInteger(0); + + StandardProcessorTestRunner(final Processor processor) { + this.processor = processor; + this.idGenerator = new AtomicLong(0L); + this.sharedState = new SharedSessionState(processor, idGenerator); + this.flowFileQueue = sharedState.getFlowFileQueue(); + this.sessionFactory = new MockSessionFactory(sharedState); + this.context = new MockProcessContext(processor); + + final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context); + processor.initialize(mockInitContext); + + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + } catch (Exception e) { + Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e); + } + + triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class); + } + + @Override + public void setValidateExpressionUsage(final boolean validate) { + context.setValidateExpressionUsage(validate); + } + + @Override + public Processor getProcessor() { + return processor; + } + + @Override + public MockProcessContext getProcessContext() { + return context; + } + + @Override + public void run() { + run(1); + } + + @Override + public void run(int iterations) { + run(iterations, true); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish) { + run(iterations, stopOnFinish, true); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { + if (iterations < 1) { + throw new IllegalArgumentException(); + } + + context.assertValid(); + context.enableExpressionValidation(); + try { + if ( initialize ) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e); + } + } + + final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + @SuppressWarnings("unchecked") + final Future<Throwable>[] futures = new Future[iterations]; + for (int i = 0; i < iterations; i++) { + final Future<Throwable> future = executorService.submit(new RunProcessor()); + futures[i] = future; + } + + executorService.shutdown(); + + int finishedCount = 0; + boolean unscheduledRun = false; + for (final Future<Throwable> future : futures) { + try { + final Throwable thrown = future.get(); // wait for the result + if (thrown != null) { + throw new AssertionError(thrown); + } + + if (++finishedCount == 1) { + unscheduledRun = true; + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); + } catch (Exception e) { + Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); + } + } + } catch (final Exception e) { + } + } + + if (!unscheduledRun) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); + } catch (Exception e) { + Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); + } + } + + if (stopOnFinish) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor); + } catch (Exception e) { + Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e); + } + } + } finally { + context.disableExpressionValidation(); + } + } + + @Override + public void shutdown() { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor); + } catch (Exception e) { + Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e); + } + } + + private class RunProcessor implements Callable<Throwable> { + + @Override + public Throwable call() throws Exception { + invocations.incrementAndGet(); + try { + processor.onTrigger(context, sessionFactory); + } catch (final Throwable t) { + return t; + } + + return null; + } + } + + @Override + public ProcessSessionFactory getProcessSessionFactory() { + return sessionFactory; + } + + @Override + public void assertAllFlowFilesTransferred(final String relationship) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFilesTransferred(relationship); + } + } + + @Override + public void assertAllFlowFilesTransferred(final Relationship relationship) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFilesTransferred(relationship); + } + } + + @Override + public void assertAllFlowFilesTransferred(final String relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + @Override + public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + @Override + public void assertTransferCount(final Relationship relationship, final int count) { + Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size()); + } + + @Override + public void assertTransferCount(final String relationship, final int count) { + Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size()); + } + + @Override + public void assertValid() { + context.assertValid(); + } + + @Override + public void assertNotValid() { + Assert.assertFalse("Processor appears to be valid but expected it to be invalid", context.isValid()); + } + + @Override + public boolean isQueueEmpty() { + return flowFileQueue.isEmpty(); + } + + @Override + public void assertQueueEmpty() { + Assert.assertTrue(flowFileQueue.isEmpty()); + } + + @Override + public void assertQueueNotEmpty() { + Assert.assertFalse(flowFileQueue.isEmpty()); + } + + @Override + public void clearTransferState() { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.clearTransferState(); + } + } + + @Override + public void enqueue(final FlowFile... flowFiles) { + for (final FlowFile flowFile : flowFiles) { + flowFileQueue.offer((MockFlowFile) flowFile); + } + } + + @Override + public void enqueue(final Path path) throws IOException { + enqueue(path, new HashMap<String, String>()); + } + + @Override + public void enqueue(final Path path, final Map<String, String> attributes) throws IOException { + final Map<String, String> modifiedAttributes = new HashMap<>(attributes); + if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) { + modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName()); + } + try (final InputStream in = Files.newInputStream(path)) { + enqueue(in, modifiedAttributes); + } + } + + @Override + public void enqueue(final byte[] data) { + enqueue(data, new HashMap<String, String>()); + } + + @Override + public void enqueue(final byte[] data, final Map<String, String> attributes) { + enqueue(new ByteArrayInputStream(data), attributes); + } + + @Override + public void enqueue(final InputStream data) { + enqueue(data, new HashMap<String, String>()); + } + + @Override + public void enqueue(final InputStream data, final Map<String, String> attributes) { + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator)); + MockFlowFile flowFile = session.create(); + flowFile = session.importFrom(data, flowFile); + flowFile = session.putAllAttributes(flowFile, attributes); + enqueue(flowFile); + } + + @Override + public byte[] getContentAsByteArray(final MockFlowFile flowFile) { + return flowFile.getData(); + } + + @Override + public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { + final Relationship rel = new Relationship.Builder().name(relationship).build(); + return getFlowFilesForRelationship(rel); + } + + @Override + public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) { + final List<MockFlowFile> flowFiles = new ArrayList<>(); + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + flowFiles.addAll(session.getFlowFilesForRelationship(relationship)); + } + + Collections.sort(flowFiles, new Comparator<MockFlowFile>() { + @Override + public int compare(final MockFlowFile o1, final MockFlowFile o2) { + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }); + + return flowFiles; + } + + @Override + public ProvenanceReporter getProvenanceReporter() { + return sharedState.getProvenanceReporter(); + } + + @Override + public QueueSize getQueueSize() { + return flowFileQueue.size(); + } + + @Override + public Long getCounterValue(final String name) { + return sharedState.getCounterValue(name); + } + + @Override + public int getRemovedCount() { + int count = 0; + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + count += session.getRemovedCount(); + } + + return count; + } + + @Override + public void setAnnotationData(final String annotationData) { + context.setAnnotationData(annotationData); + } + + @Override + public ValidationResult setProperty(final String propertyName, final String propertyValue) { + return context.setProperty(propertyName, propertyValue); + } + + @Override + public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) { + return context.setProperty(descriptor, value); + } + + @Override + public ValidationResult setProperty(final PropertyDescriptor descriptor, final AllowableValue value) { + return context.setProperty(descriptor, value.getValue()); + } + + @Override + public void setThreadCount(final int threadCount) { + if (threadCount > 1 && triggerSerially) { + Assert.fail("Cannot set thread-count higher than 1 because the processor is triggered serially"); + } + + this.numThreads = threadCount; + } + + @Override + public int getThreadCount() { + return numThreads; + } + + @Override + public void setRelationshipAvailable(final Relationship relationship) { - final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); ++ final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); + unavailable.remove(relationship); - sharedState.setUnavailableRelationships(unavailable); ++ context.setUnavailableRelationships(unavailable); + } + + @Override + public void setRelationshipAvailable(final String relationshipName) { + setRelationshipAvailable(new Relationship.Builder().name(relationshipName).build()); + } + + @Override + public void setRelationshipUnavailable(final Relationship relationship) { - final Set<Relationship> unavailable = new HashSet<>(sharedState.getUnavailableRelationships()); ++ final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); + unavailable.add(relationship); - sharedState.setUnavailableRelationships(unavailable); ++ context.setUnavailableRelationships(unavailable); + } + + @Override + public void setRelationshipUnavailable(final String relationshipName) { + setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build()); + } + + @Override + public void addControllerService(final String identifier, final ControllerService service) throws InitializationException { + addControllerService(identifier, service, new HashMap<String, String>()); + } + + @Override + public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException { + final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier)); + service.initialize(initContext); + + final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>(); + for (final Map.Entry<String, String> entry : properties.entrySet()) { + resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), entry.getValue()); + } + + final MockConfigurationContext configurationContext = new MockConfigurationContext(resolvedProps, context); + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, service, configurationContext); + } catch (final InvocationTargetException | IllegalAccessException | IllegalArgumentException e) { + throw new InitializationException(e); + } + + context.addControllerService(identifier, service, resolvedProps, null); + } + + @Override + public ControllerService getControllerService(final String identifier) { + return context.getControllerService(identifier); + } + + @Override + public <T extends ControllerService> T getControllerService(final String identifier, final Class<T> serviceType) { + final ControllerService service = context.getControllerService(identifier); + return serviceType.cast(service); + } + + @Override + public boolean removeProperty(PropertyDescriptor descriptor) { + return context.removeProperty(descriptor); + } + + }