http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java new file mode 100644 index 0000000..4ad3138 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -0,0 +1,1037 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.FlowFileHandlingException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.junit.Assert; + +public class MockProcessSession implements ProcessSession { + + private final Map<Relationship, List<MockFlowFile>> transferMap = new ConcurrentHashMap<>(); + private final MockFlowFileQueue processorQueue; + private final Set<Long> beingProcessed = new HashSet<>(); + + private final Map<Long, MockFlowFile> currentVersions = new HashMap<>(); + private final Map<Long, MockFlowFile> originalVersions = new HashMap<>(); + private final SharedSessionState sharedState; + private final Map<String, Long> counterMap = new HashMap<>(); + private final MockProvenanceReporter provenanceReporter; + + private boolean committed = false; + private boolean rolledback = false; + private int removedCount = 0; + + public MockProcessSession(final SharedSessionState sharedState, final Processor processor) { + this.sharedState = sharedState; + this.processorQueue = sharedState.getFlowFileQueue(); + provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName()); + } + + @Override + public void adjustCounter(final String name, final long delta, final boolean immediate) { + if (immediate) { + sharedState.adjustCounter(name, delta); + return; + } + + Long counter = counterMap.get(name); + if (counter == null) { + counter = delta; + counterMap.put(name, counter); + return; + } + + counter = counter + delta; + counterMap.put(name, counter); + } + + @Override + public MockFlowFile clone(final FlowFile flowFile) { + validateState(flowFile); + final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public MockFlowFile clone(final FlowFile flowFile, final long offset, final long size) { + validateState(flowFile); + if (offset + size > flowFile.getSize()) { + throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + flowFile.toString()); + } + + final MockFlowFile newFlowFile = new MockFlowFile(sharedState.nextFlowFileId(), flowFile); + final byte[] newContent = Arrays.copyOfRange(((MockFlowFile) flowFile).getData(), (int) offset, (int) (offset + size)); + newFlowFile.setData(newContent); + + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public void commit() { + if (!beingProcessed.isEmpty()) { + throw new FlowFileHandlingException("Cannot commit session because the following FlowFiles have not been removed or transferred: " + beingProcessed); + } + committed = true; + beingProcessed.clear(); + currentVersions.clear(); + originalVersions.clear(); + + for (final Map.Entry<String, Long> entry : counterMap.entrySet()) { + sharedState.adjustCounter(entry.getKey(), entry.getValue()); + } + + sharedState.addProvenanceEvents(provenanceReporter.getEvents()); + counterMap.clear(); + } + + /** + * Clear the 'committed' flag so that we can test that the next iteration of + * {@link nifi.processor.Processor#onTrigger} commits or rolls back the + * session + */ + public void clearCommited() { + committed = false; + } + + /** + * Clear the 'rolledBack' flag so that we can test that the next iteration + * of {@link nifi.processor.Processor#onTrigger} commits or rolls back the + * session + */ + public void clearRollback() { + rolledback = false; + } + + @Override + public MockFlowFile create() { + final MockFlowFile flowFile = new MockFlowFile(sharedState.nextFlowFileId()); + currentVersions.put(flowFile.getId(), flowFile); + beingProcessed.add(flowFile.getId()); + return flowFile; + } + + @Override + public MockFlowFile create(final FlowFile flowFile) { + MockFlowFile newFlowFile = create(); + newFlowFile = (MockFlowFile) inheritAttributes(flowFile, newFlowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public MockFlowFile create(final Collection<FlowFile> flowFiles) { + MockFlowFile newFlowFile = create(); + newFlowFile = (MockFlowFile) inheritAttributes(flowFiles, newFlowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + beingProcessed.add(newFlowFile.getId()); + return newFlowFile; + } + + @Override + public void exportTo(final FlowFile flowFile, final OutputStream out) { + validateState(flowFile); + if (flowFile == null || out == null) { + throw new IllegalArgumentException("arguments cannot be null"); + } + + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + + try { + out.write(mock.getData()); + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public void exportTo(final FlowFile flowFile, final Path path, final boolean append) { + validateState(flowFile); + if (flowFile == null || path == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + + final OpenOption mode = append ? StandardOpenOption.APPEND : StandardOpenOption.CREATE; + + try (final OutputStream out = Files.newOutputStream(path, mode)) { + out.write(mock.getData()); + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public MockFlowFile get() { + final MockFlowFile flowFile = processorQueue.poll(); + if (flowFile != null) { + beingProcessed.add(flowFile.getId()); + currentVersions.put(flowFile.getId(), flowFile); + originalVersions.put(flowFile.getId(), flowFile); + } + return flowFile; + } + + @Override + public List<FlowFile> get(final int maxResults) { + final List<FlowFile> flowFiles = new ArrayList<>(Math.min(500, maxResults)); + for (int i = 0; i < maxResults; i++) { + final MockFlowFile nextFlowFile = get(); + if (nextFlowFile == null) { + return flowFiles; + } + + flowFiles.add(nextFlowFile); + } + + return flowFiles; + } + + @Override + public List<FlowFile> get(final FlowFileFilter filter) { + final List<FlowFile> flowFiles = new ArrayList<>(); + final List<MockFlowFile> unselected = new ArrayList<>(); + + while (true) { + final MockFlowFile flowFile = processorQueue.poll(); + if (flowFile == null) { + break; + } + + final FlowFileFilter.FlowFileFilterResult filterResult = filter.filter(flowFile); + if (filterResult.isAccept()) { + flowFiles.add(flowFile); + + beingProcessed.add(flowFile.getId()); + currentVersions.put(flowFile.getId(), flowFile); + originalVersions.put(flowFile.getId(), flowFile); + } else { + unselected.add(flowFile); + } + + if (!filterResult.isContinue()) { + break; + } + } + + processorQueue.addAll(unselected); + return flowFiles; + } + + @Override + public QueueSize getQueueSize() { + return processorQueue.size(); + } + + @Override + public MockFlowFile importFrom(final InputStream in, final FlowFile flowFile) { + validateState(flowFile); + if (in == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + try { + final byte[] data = readFully(in); + newFlowFile.setData(data); + return newFlowFile; + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + } + + @Override + public MockFlowFile importFrom(final Path path, final boolean keepSourceFile, final FlowFile flowFile) { + validateState(flowFile); + if (path == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + Files.copy(path, baos); + } catch (final IOException e) { + throw new FlowFileAccessException(e.toString(), e); + } + + newFlowFile.setData(baos.toByteArray()); + newFlowFile = putAttribute(newFlowFile, CoreAttributes.FILENAME.key(), path.getFileName().toString()); + return newFlowFile; + } + + @Override + public MockFlowFile merge(final Collection<FlowFile> sources, final FlowFile destination) { + for (final FlowFile source : sources) { + validateState(source); + } + validateState(destination); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (final FlowFile flowFile : sources) { + final MockFlowFile mock = (MockFlowFile) flowFile; + final byte[] data = mock.getData(); + try { + baos.write(data); + } catch (final IOException e) { + throw new AssertionError("Failed to write to BAOS"); + } + } + + final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); + newFlowFile.setData(baos.toByteArray()); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + return newFlowFile; + } + + @Override + public MockFlowFile putAllAttributes(final FlowFile flowFile, final Map<String, String> attrs) { + validateState(flowFile); + if (attrs == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.putAttributes(attrs); + return newFlowFile; + } + + @Override + public MockFlowFile putAttribute(final FlowFile flowFile, final String attrName, final String attrValue) { + validateState(flowFile); + if (attrName == null || attrValue == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot update attributes of a flow file that I did not create"); + } + + if ("uuid".equals(attrName)) { + Assert.fail("Should not be attempting to set FlowFile UUID via putAttribute. This will be ignored in production"); + } + + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put(attrName, attrValue); + newFlowFile.putAttributes(attrs); + return newFlowFile; + } + + @Override + public void read(final FlowFile flowFile, final InputStreamCallback callback) { + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + + validateState(flowFile); + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData()); + try { + callback.process(bais); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + } + + @Override + public void remove(final FlowFile flowFile) { + validateState(flowFile); + final Iterator<Long> itr = beingProcessed.iterator(); + while (itr.hasNext()) { + final Long ffId = itr.next(); + if (ffId != null && ffId.equals(flowFile.getId())) { + itr.remove(); + beingProcessed.remove(ffId); + removedCount++; + currentVersions.remove(ffId); + return; + } + } + + throw new ProcessException(flowFile + " not found in queue"); + } + + @Override + public void remove(final Collection<FlowFile> flowFiles) { + for (final FlowFile flowFile : flowFiles) { + validateState(flowFile); + } + + for (final FlowFile flowFile : flowFiles) { + remove(flowFile); + } + } + + @Override + public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Set<String> attrNames) { + validateState(flowFile); + if (attrNames == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.removeAttributes(attrNames); + return newFlowFile; + } + + @Override + public MockFlowFile removeAllAttributes(final FlowFile flowFile, final Pattern keyPattern) { + validateState(flowFile); + if (flowFile == null) { + throw new IllegalArgumentException("flowFile cannot be null"); + } + if (keyPattern == null) { + return (MockFlowFile) flowFile; + } + + final Set<String> attrsToRemove = new HashSet<>(); + for (final String key : flowFile.getAttributes().keySet()) { + if (keyPattern.matcher(key).matches()) { + attrsToRemove.add(key); + } + } + + return removeAllAttributes(flowFile, attrsToRemove); + } + + @Override + public MockFlowFile removeAttribute(final FlowFile flowFile, final String attrName) { + validateState(flowFile); + if (attrName == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + final Set<String> attrNames = new HashSet<>(); + attrNames.add(attrName); + newFlowFile.removeAttributes(attrNames); + return newFlowFile; + } + + @Override + public void rollback() { + rollback(false); + } + + @Override + public void rollback(final boolean penalize) { + for (final List<MockFlowFile> list : transferMap.values()) { + for (final MockFlowFile flowFile : list) { + processorQueue.offer(flowFile); + } + } + + for (final Long flowFileId : beingProcessed) { + final MockFlowFile flowFile = originalVersions.get(flowFileId); + if (flowFile != null) { + processorQueue.offer(flowFile); + } + } + + rolledback = true; + beingProcessed.clear(); + currentVersions.clear(); + originalVersions.clear(); + transferMap.clear(); + clearTransferState(); + } + + @Override + public void transfer(final FlowFile flowFile) { + validateState(flowFile); + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("I only accept MockFlowFile"); + } + + beingProcessed.remove(flowFile.getId()); + processorQueue.offer((MockFlowFile) flowFile); + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles) { + for (final FlowFile flowFile : flowFiles) { + transfer(flowFile); + } + } + + @Override + public void transfer(final FlowFile flowFile, final Relationship relationship) { + if (relationship == Relationship.SELF) { + transfer(flowFile); + return; + } + + validateState(flowFile); + List<MockFlowFile> list = transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + transferMap.put(relationship, list); + } + + beingProcessed.remove(flowFile.getId()); + list.add((MockFlowFile) flowFile); + } + + @Override + public void transfer(final Collection<FlowFile> flowFiles, final Relationship relationship) { + if (relationship == Relationship.SELF) { + transfer(flowFiles); + return; + } + + for (final FlowFile flowFile : flowFiles) { + validateState(flowFile); + } + + List<MockFlowFile> list = transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + transferMap.put(relationship, list); + } + + for (final FlowFile flowFile : flowFiles) { + beingProcessed.remove(flowFile.getId()); + list.add((MockFlowFile) flowFile); + } + } + + @Override + public MockFlowFile write(final FlowFile flowFile, final OutputStreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + callback.process(baos); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.setData(baos.toByteArray()); + return newFlowFile; + } + + @Override + public FlowFile append(final FlowFile flowFile, final OutputStreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + baos.write(mock.getData()); + callback.process(baos); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + newFlowFile.setData(baos.toByteArray()); + return newFlowFile; + } + + @Override + public MockFlowFile write(final FlowFile flowFile, final StreamCallback callback) { + validateState(flowFile); + if (callback == null || flowFile == null) { + throw new IllegalArgumentException("argument cannot be null"); + } + if (!(flowFile instanceof MockFlowFile)) { + throw new IllegalArgumentException("Cannot export a flow file that I did not create"); + } + final MockFlowFile mock = (MockFlowFile) flowFile; + + final ByteArrayInputStream in = new ByteArrayInputStream(mock.getData()); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + callback.process(in, out); + } catch (final IOException e) { + throw new ProcessException(e.toString(), e); + } + + final MockFlowFile newFlowFile = new MockFlowFile(mock.getId(), flowFile); + currentVersions.put(newFlowFile.getId(), newFlowFile); + newFlowFile.setData(out.toByteArray()); + + return newFlowFile; + } + + private byte[] readFully(final InputStream in) throws IOException { + final byte[] buffer = new byte[4096]; + int len; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + while ((len = in.read(buffer)) >= 0) { + baos.write(buffer, 0, len); + } + + return baos.toByteArray(); + } + + public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) { + List<MockFlowFile> list = this.transferMap.get(relationship); + if (list == null) { + list = new ArrayList<>(); + } + + return list; + } + + /** + * @param relationship to get flowfiles for + * @return a List of FlowFiles in the order in which they were transferred + * to the given relationship + */ + public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { + final Relationship procRel = new Relationship.Builder().name(relationship).build(); + return getFlowFilesForRelationship(procRel); + } + + public MockFlowFile createFlowFile(final File file) throws IOException { + return createFlowFile(Files.readAllBytes(file.toPath())); + } + + public MockFlowFile createFlowFile(final byte[] data) { + final MockFlowFile flowFile = create(); + flowFile.setData(data); + return flowFile; + } + + public MockFlowFile createFlowFile(final byte[] data, final Map<String, String> attrs) { + final MockFlowFile ff = createFlowFile(data); + ff.putAttributes(attrs); + return ff; + } + + @Override + public MockFlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) { + for (final FlowFile flowFile : sources) { + validateState(flowFile); + } + validateState(destination); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + if (header != null) { + baos.write(header); + } + + int count = 0; + for (final FlowFile flowFile : sources) { + baos.write(((MockFlowFile) flowFile).getData()); + if (demarcator != null && ++count != sources.size()) { + baos.write(demarcator); + } + } + + if (footer != null) { + baos.write(footer); + } + } catch (final IOException e) { + throw new AssertionError("failed to write data to BAOS"); + } + + final MockFlowFile newFlowFile = new MockFlowFile(destination.getId(), destination); + newFlowFile.setData(baos.toByteArray()); + currentVersions.put(newFlowFile.getId(), newFlowFile); + + return newFlowFile; + } + + private void validateState(final FlowFile flowFile) { + Objects.requireNonNull(flowFile); + final FlowFile currentVersion = currentVersions.get(flowFile.getId()); + if (currentVersion == null) { + throw new FlowFileHandlingException(flowFile + " is not known in this session"); + } + + if (currentVersion != flowFile) { + throw new FlowFileHandlingException(flowFile + " is not the most recent version of this flow file within this session"); + } + + for (final List<MockFlowFile> flowFiles : transferMap.values()) { + if (flowFiles.contains(flowFile)) { + throw new IllegalStateException(flowFile + " has already been transferred"); + } + } + } + + /** + * Inherits the attributes from the given source flow file into another flow + * file. The UUID of the source becomes the parent UUID of this flow file. + * If a parent uuid had previously been established it will be replaced by + * the uuid of the given source + * + * @param source the FlowFile from which to copy attributes + * @param destination the FlowFile to which to copy attributes + */ + private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) { + if (source == null || destination == null || source == destination) { + return destination; //don't need to inherit from ourselves + } + final FlowFile updated = putAllAttributes(destination, source.getAttributes()); + getProvenanceReporter().fork(source, Collections.singletonList(updated)); + return updated; + } + + /** + * Inherits the attributes from the given source flow files into the + * destination flow file. The UUIDs of the sources becomes the parent UUIDs + * of the destination flow file. Only attributes which is common to all + * source items is copied into this flow files attributes. Any previously + * established parent UUIDs will be replaced by the UUIDs of the sources. It + * will capture the uuid of a certain number of source objects and may not + * capture all of them. How many it will capture is unspecified. + * + * @param sources to inherit common attributes from + */ + private FlowFile inheritAttributes(final Collection<FlowFile> sources, final FlowFile destination) { + final StringBuilder parentUuidBuilder = new StringBuilder(); + int uuidsCaptured = 0; + for (final FlowFile source : sources) { + if (source == destination) { + continue; //don't want to capture parent uuid of this. Something can't be a child of itself + } + final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key()); + if (sourceUuid != null && !sourceUuid.trim().isEmpty()) { + uuidsCaptured++; + if (parentUuidBuilder.length() > 0) { + parentUuidBuilder.append(","); + } + parentUuidBuilder.append(sourceUuid); + } + + if (uuidsCaptured > 100) { + break; + } + } + + final FlowFile updated = putAllAttributes(destination, intersectAttributes(sources)); + getProvenanceReporter().join(sources, updated); + return updated; + } + + /** + * Returns the attributes that are common to every flow file given. The key + * and value must match exactly. + * + * @param flowFileList a list of flow files + * + * @return the common attributes + */ + private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) { + final Map<String, String> result = new HashMap<>(); + //trivial cases + if (flowFileList == null || flowFileList.isEmpty()) { + return result; + } else if (flowFileList.size() == 1) { + result.putAll(flowFileList.iterator().next().getAttributes()); + } + + /* + * Start with the first attribute map and only put an entry to the + * resultant map if it is common to every map. + */ + final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes(); + + outer: + for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { + final String key = mapEntry.getKey(); + final String value = mapEntry.getValue(); + for (final FlowFile flowFile : flowFileList) { + final Map<String, String> currMap = flowFile.getAttributes(); + final String curVal = currMap.get(key); + if (curVal == null || !curVal.equals(value)) { + continue outer; + } + } + result.put(key, value); + } + + return result; + } + + /** + * Assert that {@link #commit()} has been called + */ + public void assertCommitted() { + Assert.assertTrue("Session was not committed", committed); + } + + /** + * Assert that {@link #commit()} has not been called + */ + public void assertNotCommitted() { + Assert.assertFalse("Session was committed", committed); + } + + /** + * Assert that {@link #rollback()} has been called + */ + public void assertRolledBack() { + Assert.assertTrue("Session was not rolled back", rolledback); + } + + /** + * Assert that {@link #rollback()} has not been called + */ + public void assertNotRolledBack() { + Assert.assertFalse("Session was rolled back", rolledback); + } + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship to validate transfer count of + * @param count items transfer to given relationship + */ + public void assertTransferCount(final Relationship relationship, final int count) { + final int transferCount = getFlowFilesForRelationship(relationship).size(); + Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to " + + relationship + " but actual transfer count was " + transferCount, count, transferCount); + } + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship to validate transfer count of + * @param count items transfer to given relationship + */ + public void assertTransferCount(final String relationship, final int count) { + assertTransferCount(new Relationship.Builder().name(relationship).build(), count); + } + + /** + * Assert that there are no FlowFiles left on the input queue. + */ + public void assertQueueEmpty() { + Assert.assertTrue("FlowFile Queue has " + this.processorQueue.size() + " items", this.processorQueue.isEmpty()); + } + + /** + * Assert that at least one FlowFile is on the input queue + */ + public void assertQueueNotEmpty() { + Assert.assertFalse("FlowFile Queue is empty", this.processorQueue.isEmpty()); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship to check for transferred flow files + */ + public void assertAllFlowFilesTransferred(final String relationship) { + assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build()); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship to validate + */ + public void assertAllFlowFilesTransferred(final Relationship relationship) { + for (final Map.Entry<Relationship, List<MockFlowFile>> entry : transferMap.entrySet()) { + final Relationship rel = entry.getKey(); + final List<MockFlowFile> flowFiles = entry.getValue(); + + if (!rel.equals(relationship) && flowFiles != null && !flowFiles.isEmpty()) { + Assert.fail("Expected all Transferred FlowFiles to go to " + relationship + " but " + flowFiles.size() + " were routed to " + rel); + } + } + } + + /** + * Removes all state information about FlowFiles that have been transferred + */ + public void clearTransferState() { + this.transferMap.clear(); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship to validate + * @param count number of items sent to that relationship (expected) + */ + public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship to validate + * @param count number of items sent to that relationship (expected) + */ + public void assertAllFlowFilesTransferred(final String relationship, final int count) { + assertAllFlowFilesTransferred(new Relationship.Builder().name(relationship).build(), count); + } + + /** + * @return the number of FlowFiles that were removed + */ + public int getRemovedCount() { + return removedCount; + } + + @Override + public ProvenanceReporter getProvenanceReporter() { + return provenanceReporter; + } + + @Override + public MockFlowFile penalize(final FlowFile flowFile) { + validateState(flowFile); + final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; + mockFlowFile.setPenalized(); + return mockFlowFile; + } + + public byte[] getContentAsByteArray(final MockFlowFile flowFile) { + validateState(flowFile); + return flowFile.getData(); + } + + /** + * Checks if a FlowFile is known in this session. + * + * @param flowFile + * the FlowFile to check + * @return <code>true</code> if the FlowFile is known in this session, + * <code>false</code> otherwise. + */ + boolean isFlowFileKnown(final FlowFile flowFile) { + final FlowFile curFlowFile = currentVersions.get(flowFile.getId()); + if (curFlowFile == null) { + return false; + } + + final String curUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + final String providedUuid = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + if (!curUuid.equals(providedUuid)) { + return false; + } + + return true; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java new file mode 100644 index 0000000..2e5d3eb --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; + +public class MockProcessorInitializationContext implements ProcessorInitializationContext, ControllerServiceLookup { + + private final ProcessorLog logger; + private final String processorId; + private final MockProcessContext context; + + public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context) { + processorId = UUID.randomUUID().toString(); + logger = new MockProcessorLog(processorId, processor); + this.context = context; + } + + @Override + public String getIdentifier() { + return processorId; + } + + @Override + public ProcessorLog getLogger() { + return logger; + } + + @Override + public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { + return context.getControllerServiceIdentifiers(serviceType); + } + + @Override + public ControllerService getControllerService(final String identifier) { + return context.getControllerService(identifier); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } + + @Override + public String getControllerServiceName(String serviceIdentifier) { + return context.getControllerServiceName(serviceIdentifier); + } + + @Override + public boolean isControllerServiceEnabled(String serviceIdentifier) { + return context.isControllerServiceEnabled(serviceIdentifier); + } + + @Override + public boolean isControllerServiceEnabled(ControllerService service) { + return context.isControllerServiceEnabled(service); + } + + @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return context.isControllerServiceEnabling(serviceIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java new file mode 100644 index 0000000..837784b --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorLog.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.logging.ProcessorLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockProcessorLog implements ProcessorLog { + + private final Logger logger; + private final Object component; + + public MockProcessorLog(final String componentId, final Object component) { + this.logger = LoggerFactory.getLogger(component.getClass()); + this.component = component; + } + + private Object[] addProcessor(final Object[] originalArgs) { + return prependToArgs(originalArgs, component); + } + + private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) { + final Object[] modifiedArgs = new Object[os.length + 2]; + modifiedArgs[0] = component.toString(); + for (int i = 0; i < os.length; i++) { + modifiedArgs[i + 1] = os[i]; + } + modifiedArgs[modifiedArgs.length - 1] = t.toString(); + + return modifiedArgs; + } + + private Object[] prependToArgs(final Object[] originalArgs, final Object... toAdd) { + final Object[] newArgs = new Object[originalArgs.length + toAdd.length]; + System.arraycopy(toAdd, 0, newArgs, 0, toAdd.length); + System.arraycopy(originalArgs, 0, newArgs, toAdd.length, originalArgs.length); + return newArgs; + } + + private Object[] translateException(final Object[] os) { + if (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)) { + final Object[] osCopy = new Object[os.length]; + osCopy[osCopy.length - 1] = os[os.length - 1].toString(); + System.arraycopy(os, 0, osCopy, 0, os.length - 1); + return osCopy; + } + return os; + } + + private boolean lastArgIsException(final Object[] os) { + return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)); + } + + @Override + public void warn(final String msg, final Throwable t) { + warn("{} " + msg, new Object[]{component}, t); + } + + @Override + public void warn(String msg, Object[] os) { + if (lastArgIsException(os)) { + warn(msg, translateException(os), (Throwable) os[os.length - 1]); + } else { + msg = "{} " + msg; + os = addProcessor(os); + logger.warn(msg, os); + } + } + + @Override + public void warn(String msg, Object[] os, final Throwable t) { + os = addProcessorAndThrowable(os, t); + msg = "{} " + msg + ": {}"; + + logger.warn(msg, os); + if (logger.isDebugEnabled()) { + logger.warn("", t); + } + } + + @Override + public void warn(String msg) { + msg = "{} " + msg; + logger.warn(msg, component); + } + + @Override + public void trace(String msg, Throwable t) { + msg = "{} " + msg; + final Object[] os = {component}; + logger.trace(msg, os, t); + } + + @Override + public void trace(String msg, Object[] os) { + msg = "{} " + msg; + os = addProcessor(os); + logger.trace(msg, os); + } + + @Override + public void trace(String msg) { + msg = "{} " + msg; + final Object[] os = {component}; + logger.trace(msg, os); + } + + @Override + public void trace(String msg, Object[] os, Throwable t) { + os = addProcessorAndThrowable(os, t); + msg = "{} " + msg + ": {}"; + + logger.trace(msg, os); + logger.trace("", t); + } + + @Override + public boolean isWarnEnabled() { + return logger.isWarnEnabled(); + } + + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + + @Override + public boolean isInfoEnabled() { + return logger.isInfoEnabled(); + } + + @Override + public boolean isErrorEnabled() { + return logger.isErrorEnabled(); + } + + @Override + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + + @Override + public void info(String msg, Throwable t) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.info(msg, os); + if (logger.isDebugEnabled()) { + logger.info("", t); + } + } + + @Override + public void info(String msg, Object[] os) { + msg = "{} " + msg; + os = addProcessor(os); + + logger.info(msg, os); + } + + @Override + public void info(String msg) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.info(msg, os); + } + + @Override + public void info(String msg, Object[] os, Throwable t) { + os = addProcessorAndThrowable(os, t); + msg = "{} " + msg + ": {}"; + + logger.info(msg, os); + if (logger.isDebugEnabled()) { + logger.info("", t); + } + } + + @Override + public String getName() { + return logger.getName(); + } + + @Override + public void error(String msg, Throwable t) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.error(msg, os, t); + if (logger.isDebugEnabled()) { + logger.error("", t); + } + } + + @Override + public void error(String msg, Object[] os) { + if (lastArgIsException(os)) { + error(msg, translateException(os), (Throwable) os[os.length - 1]); + } else { + os = addProcessor(os); + msg = "{} " + msg; + logger.error(msg, os); + } + } + + @Override + public void error(String msg) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.error(msg, os); + } + + @Override + public void error(String msg, Object[] os, Throwable t) { + os = addProcessorAndThrowable(os, t); + msg = "{} " + msg + ": {}"; + + logger.error(msg, os); + if (logger.isDebugEnabled()) { + logger.error("", t); + } + } + + @Override + public void debug(String msg, Throwable t) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.debug(msg, os, t); + } + + @Override + public void debug(String msg, Object[] os) { + os = addProcessor(os); + msg = "{} " + msg; + + logger.debug(msg, os); + } + + @Override + public void debug(String msg, Object[] os, Throwable t) { + os = addProcessorAndThrowable(os, t); + msg = "{} " + msg + ": {}"; + + logger.debug(msg, os); + if (logger.isDebugEnabled()) { + logger.debug("", t); + } + } + + @Override + public void debug(String msg) { + msg = "{} " + msg; + final Object[] os = {component}; + + logger.debug(msg, os); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java new file mode 100644 index 0000000..12436d4 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.exception.ProcessException; + +public class MockPropertyValue implements PropertyValue { + + private final String rawValue; + private final Boolean expectExpressions; + private final ControllerServiceLookup serviceLookup; + private final PropertyDescriptor propertyDescriptor; + private boolean expressionsEvaluated = false; + + public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup) { + this(rawValue, serviceLookup, null); + } + + public MockPropertyValue(final String rawValue, final ControllerServiceLookup serviceLookup, final PropertyDescriptor propertyDescriptor) { + this.rawValue = rawValue; + this.serviceLookup = serviceLookup; + this.expectExpressions = propertyDescriptor == null ? null : propertyDescriptor.isExpressionLanguageSupported(); + this.propertyDescriptor = propertyDescriptor; + } + + private void ensureExpressionsEvaluated() { + if (Boolean.TRUE.equals(expectExpressions) && !expressionsEvaluated) { + throw new IllegalStateException("Attempting to retrieve value of " + propertyDescriptor + + " without first evaluating Expressions, even though the PropertyDescriptor indicates " + + "that the Expression Language is Supported. If you realize that this is the case and do not want " + + "this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false)"); + } + } + + @Override + public String getValue() { + ensureExpressionsEvaluated(); + return rawValue; + } + + @Override + public Integer asInteger() { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : Integer.parseInt(rawValue.trim()); + } + + @Override + public Long asLong() { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : Long.parseLong(rawValue.trim()); + } + + @Override + public Boolean asBoolean() { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim()); + } + + @Override + public Float asFloat() { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : Float.parseFloat(rawValue.trim()); + } + + @Override + public Double asDouble() { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : Double.parseDouble(rawValue.trim()); + } + + @Override + public Long asTimePeriod(final TimeUnit timeUnit) { + ensureExpressionsEvaluated(); + return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit); + } + + @Override + public Double asDataSize(final DataUnit dataUnit) { + ensureExpressionsEvaluated(); + return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit); + } + + private void markEvaluated() { + if (Boolean.FALSE.equals(expectExpressions)) { + throw new IllegalStateException("Attempting to Evaluate Expressions but " + propertyDescriptor + + " indicates that the Expression Language is not supported. If you realize that this is the case and do not want " + + "this error to occur, it can be disabled by calling TestRunner.setValidateExpressionUsage(false)"); + } + expressionsEvaluated = true; + } + + @Override + public PropertyValue evaluateAttributeExpressions() throws ProcessException { + markEvaluated(); + if (rawValue == null) { + return this; + } + return evaluateAttributeExpressions(null, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException { + markEvaluated(); + if (rawValue == null) { + return this; + } + return evaluateAttributeExpressions(null, decorator); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException { + markEvaluated(); + if (rawValue == null) { + return this; + } + return evaluateAttributeExpressions(flowFile, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + markEvaluated(); + if (rawValue == null) { + return this; + } + return new MockPropertyValue(Query.evaluateExpressions(rawValue, flowFile, decorator), serviceLookup); + } + + @Override + public ControllerService asControllerService() { + ensureExpressionsEvaluated(); + if (rawValue == null || rawValue.equals("")) { + return null; + } + + return serviceLookup.getControllerService(rawValue); + } + + @Override + public <T extends ControllerService> T asControllerService(final Class<T> serviceType) throws IllegalArgumentException { + ensureExpressionsEvaluated(); + if (rawValue == null || rawValue.equals("")) { + return null; + } + + final ControllerService service = serviceLookup.getControllerService(rawValue); + if (serviceType.isAssignableFrom(service.getClass())) { + return serviceType.cast(service); + } + throw new IllegalArgumentException("Controller Service with identifier " + rawValue + " is of type " + service.getClass() + " and cannot be cast to " + serviceType); + } + + @Override + public boolean isSet() { + return rawValue != null; + } + + @Override + public String toString() { + return getValue(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java new file mode 100644 index 0000000..8c9a320 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileHandlingException; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockProvenanceReporter implements ProvenanceReporter { + private static final Logger logger = LoggerFactory.getLogger(MockProvenanceReporter.class); + private final MockProcessSession session; + private final String processorId; + private final String processorType; + private final SharedSessionState sharedSessionState; + private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>(); + + public MockProvenanceReporter(final MockProcessSession session, final SharedSessionState sharedState, final String processorId, final String processorType) { + this.session = session; + this.sharedSessionState = sharedState; + this.processorId = processorId; + this.processorType = processorType; + } + + private void verifyFlowFileKnown(final FlowFile flowFile) { + if (session != null && !session.isFlowFileKnown(flowFile)) { + throw new FlowFileHandlingException(flowFile + " is not known to " + session); + } + } + + Set<ProvenanceEventRecord> getEvents() { + return Collections.unmodifiableSet(events); + } + + /** + * Removes the given event from the reporter + * + * @param event + * event + */ + void remove(final ProvenanceEventRecord event) { + events.remove(event); + } + + void clear() { + events.clear(); + } + + /** + * Generates a Fork event for the given child and parents but does not + * register the event. This is useful so that a ProcessSession has the + * ability to de-dupe events, since one or more events may be created by the + * session itself, as well as by the Processor + * + * @param parents + * parents + * @param child + * child + * @return record + */ + ProvenanceEventRecord generateJoinEvent(final Collection<FlowFile> parents, final FlowFile child) { + final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); + eventBuilder.addChildFlowFile(child); + + for (final FlowFile parent : parents) { + eventBuilder.addParentFlowFile(parent); + } + + return eventBuilder.build(); + } + + ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final String details) { + return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build(); + } + + @Override + public void receive(final FlowFile flowFile, final String transitUri) { + receive(flowFile, transitUri, -1L); + } + + @Override + public void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier) { + receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, -1L); + } + + @Override + public void receive(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + receive(flowFile, transitUri, null, transmissionMillis); + } + + @Override + public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final long transmissionMillis) { + receive(flowFile, transitUri, sourceSystemFlowFileIdentifier, null, transmissionMillis); + } + + @Override + public void receive(final FlowFile flowFile, final String transitUri, final String sourceSystemFlowFileIdentifier, final String details, final long transmissionMillis) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) + .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis) { + send(flowFile, transitUri, transmissionMillis, true); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri) { + send(flowFile, transitUri, null, -1L, true); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final String details) { + send(flowFile, transitUri, details, -1L, true); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final long transmissionMillis, final boolean force) { + send(flowFile, transitUri, null, transmissionMillis, force); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final String details, final boolean force) { + send(flowFile, transitUri, details, -1L, force); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) { + send(flowFile, transitUri, details, transmissionMillis, true); + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); + if (force) { + sharedSessionState.addProvenanceEvents(Collections.singleton(record)); + } else { + events.add(record); + } + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void send(final FlowFile flowFile, final String transitUri, final boolean force) { + send(flowFile, transitUri, -1L, true); + } + + @Override + public void associate(final FlowFile flowFile, final String alternateIdentifierNamespace, final String alternateIdentifier) { + try { + String trimmedNamespace = alternateIdentifierNamespace.trim(); + if (trimmedNamespace.endsWith(":")) { + trimmedNamespace = trimmedNamespace.substring(0, trimmedNamespace.length() - 1); + } + + String trimmedIdentifier = alternateIdentifier.trim(); + if (trimmedIdentifier.startsWith(":")) { + if (trimmedIdentifier.length() == 1) { + throw new IllegalArgumentException("Illegal alternateIdentifier: " + alternateIdentifier); + } + trimmedIdentifier = trimmedIdentifier.substring(1); + } + + final String alternateIdentifierUri = trimmedNamespace + ":" + trimmedIdentifier; + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) { + try { + final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.DROP); + if (reason != null) { + builder.setDetails("Discard reason: " + reason); + } + final ProvenanceEventRecord record = builder.build(); + events.add(record); + return record; + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + return null; + } + } + + void expire(final FlowFile flowFile, final String details) { + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void fork(final FlowFile parent, final Collection<FlowFile> children) { + fork(parent, children, null, -1L); + } + + @Override + public void fork(final FlowFile parent, final Collection<FlowFile> children, final long forkDuration) { + fork(parent, children, null, forkDuration); + } + + @Override + public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details) { + fork(parent, children, details, -1L); + } + + @Override + public void fork(final FlowFile parent, final Collection<FlowFile> children, final String details, final long forkDuration) { + verifyFlowFileKnown(parent); + + try { + final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.FORK); + eventBuilder.addParentFlowFile(parent); + for (final FlowFile child : children) { + eventBuilder.addChildFlowFile(child); + } + + if (forkDuration > -1L) { + eventBuilder.setEventDuration(forkDuration); + } + + if (details != null) { + eventBuilder.setDetails(details); + } + + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void join(final Collection<FlowFile> parents, final FlowFile child) { + join(parents, child, null, -1L); + } + + @Override + public void join(final Collection<FlowFile> parents, final FlowFile child, final long joinDuration) { + join(parents, child, null, joinDuration); + } + + @Override + public void join(final Collection<FlowFile> parents, final FlowFile child, final String details) { + join(parents, child, details, -1L); + } + + @Override + public void join(final Collection<FlowFile> parents, final FlowFile child, final String details, final long joinDuration) { + verifyFlowFileKnown(child); + + try { + final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); + eventBuilder.addChildFlowFile(child); + eventBuilder.setDetails(details); + + for (final FlowFile parent : parents) { + eventBuilder.addParentFlowFile(parent); + } + + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void clone(final FlowFile parent, final FlowFile child) { + verifyFlowFileKnown(child); + + try { + final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); + eventBuilder.addChildFlowFile(child); + eventBuilder.addParentFlowFile(parent); + events.add(eventBuilder.build()); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void modifyContent(final FlowFile flowFile) { + modifyContent(flowFile, null, -1L); + } + + @Override + public void modifyContent(final FlowFile flowFile, final String details) { + modifyContent(flowFile, details, -1L); + } + + @Override + public void modifyContent(final FlowFile flowFile, final long processingMillis) { + modifyContent(flowFile, null, processingMillis); + } + + @Override + public void modifyContent(final FlowFile flowFile, final String details, final long processingMillis) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void modifyAttributes(final FlowFile flowFile) { + modifyAttributes(flowFile, null); + } + + @Override + public void modifyAttributes(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void route(final FlowFile flowFile, final Relationship relationship) { + route(flowFile, relationship, null); + } + + @Override + public void route(final FlowFile flowFile, final Relationship relationship, final long processingDuration) { + route(flowFile, relationship, null, processingDuration); + } + + @Override + public void route(final FlowFile flowFile, final Relationship relationship, final String details) { + route(flowFile, relationship, details, -1L); + } + + @Override + public void route(final FlowFile flowFile, final Relationship relationship, final String details, final long processingDuration) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + @Override + public void create(final FlowFile flowFile) { + create(flowFile, null); + } + + @Override + public void create(final FlowFile flowFile, final String details) { + verifyFlowFileKnown(flowFile); + + try { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build(); + events.add(record); + } catch (final Exception e) { + logger.error("Failed to generate Provenance Event due to " + e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventType(eventType); + builder.fromFlowFile(flowFile); + builder.setLineageStartDate(flowFile.getLineageStartDate()); + builder.setComponentId(processorId); + builder.setComponentType(processorType); + return builder; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java new file mode 100644 index 0000000..63a9876 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinFactory; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; + +public class MockReportingContext extends MockControllerServiceLookup implements ReportingContext, ControllerServiceLookup { + + private final Map<String, ControllerServiceConfiguration> controllerServices; + private final MockEventAccess eventAccess = new MockEventAccess(); + private final Map<PropertyDescriptor, String> properties = new HashMap<>(); + + private final Map<String, List<Bulletin>> componentBulletinsCreated = new HashMap<>(); + + public MockReportingContext(final Map<String, ControllerService> controllerServices) { + this.controllerServices = new HashMap<>(); + for (final Map.Entry<String, ControllerService> entry : controllerServices.entrySet()) { + this.controllerServices.put(entry.getKey(), new ControllerServiceConfiguration(entry.getValue())); + } + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return Collections.unmodifiableMap(properties); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + final String configuredValue = properties.get(property); + return new MockPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this); + } + + public void setProperty(final String propertyName, final String value) { + this.properties.put(new PropertyDescriptor.Builder().name(propertyName).build(), value); + } + + public void setProperties(final Map<PropertyDescriptor, String> properties) { + this.properties.clear(); + this.properties.putAll(properties); + } + + @Override + public MockEventAccess getEventAccess() { + return eventAccess; + } + + @Override + public BulletinRepository getBulletinRepository() { + return new MockBulletinRepository(); + } + + @Override + public Bulletin createBulletin(final String category, final Severity severity, final String message) { + return BulletinFactory.createBulletin(category, severity.name(), message); + } + + @Override + public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(null, componentId, "test processor", category, severity.name(), message); + List<Bulletin> bulletins = componentBulletinsCreated.get(componentId); + if (bulletins == null) { + bulletins = new ArrayList<>(); + componentBulletinsCreated.put(componentId, bulletins); + } + bulletins.add(bulletin); + return bulletin; + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } + + /** + * @param componentId identifier of component to get bulletins for + * @return all Bulletins that have been created for the component with the + * given ID + */ + public List<Bulletin> getComponentBulletins(final String componentId) { + final List<Bulletin> created = componentBulletinsCreated.get(componentId); + if (created == null) { + return new ArrayList<>(); + } + + return new ArrayList<>(created); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java new file mode 100644 index 0000000..0aea00a --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.scheduling.SchedulingStrategy; + +public class MockReportingInitializationContext extends MockControllerServiceLookup implements ReportingInitializationContext, ControllerServiceLookup { + + private final String identifier; + private final String name; + private final Map<PropertyDescriptor, String> properties = new HashMap<>(); + private final ComponentLog logger; + + public MockReportingInitializationContext(final String identifier, final String name, final ComponentLog logger) { + this.identifier = identifier; + this.name = name; + this.logger = logger; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getName() { + return name; + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return 1L; + } + + public void setProperty(final String propertyName, final String value) { + setProperty(new PropertyDescriptor.Builder().name(propertyName).build(), value); + } + + public void setProperty(final PropertyDescriptor propertyName, final String value) { + this.properties.put(propertyName, value); + } + + public void setProperties(final Map<PropertyDescriptor, String> properties) { + this.properties.clear(); + this.properties.putAll(properties); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } + + @Override + public String getSchedulingPeriod() { + return "0 sec"; + } + + @Override + public SchedulingStrategy getSchedulingStrategy() { + return SchedulingStrategy.TIMER_DRIVEN; + } + + @Override + public ComponentLog getLogger() { + return logger; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java new file mode 100644 index 0000000..49b8796 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; + +public class MockSessionFactory implements ProcessSessionFactory { + + private final Processor processor; + private final SharedSessionState sharedState; + private final Set<MockProcessSession> createdSessions = new CopyOnWriteArraySet<>(); + + MockSessionFactory(final SharedSessionState sharedState, final Processor processor) { + this.sharedState = sharedState; + this.processor = processor; + } + + @Override + public ProcessSession createSession() { + final MockProcessSession session = new MockProcessSession(sharedState, processor); + createdSessions.add(session); + return session; + } + + Set<MockProcessSession> getCreatedSessions() { + return Collections.unmodifiableSet(createdSessions); + } +}