This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 4323572 NIFI-5318 Implement NiFi test harness: initial commit NIFI-5318 Implement NiFi test harness: replaced original sample feed payload with synthetic content NIFI-5318 Implement NiFi test harness: fixed test harness run crash issue; better reporting of paths NIFI-5318 Implement NiFi test harness: added further states where NiFi version can be queried NIFI-5318 Implement NiFi test harness: fixed incorrect class reference NIFI-5318 Implement NiFi test harness: added type param [...] 4323572 is described below commit 43235724e2031c30fc48e05be0d2136fd58efa52 Author: Peter G. Horvath <peter.gergely.horv...@gmail.com> AuthorDate: Sat Nov 10 22:22:01 2018 +0100 NIFI-5318 Implement NiFi test harness: initial commit NIFI-5318 Implement NiFi test harness: replaced original sample feed payload with synthetic content NIFI-5318 Implement NiFi test harness: fixed test harness run crash issue; better reporting of paths NIFI-5318 Implement NiFi test harness: added further states where NiFi version can be queried NIFI-5318 Implement NiFi test harness: fixed incorrect class reference NIFI-5318 Implement NiFi test harness: added type parameter bounding to setClassOfSingleProcessor to prevent configuring obviously incorrect classes NIFI-5318 Updated project version. This closes #3165 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> --- nifi-testharness/.gitignore | 1 + .../NIFI_TESTHARNESS_README.txt | 3 + nifi-testharness/pom.xml | 233 ++++++++ .../SimpleNiFiFlowDefinitionEditor.java | 203 +++++++ .../apache/nifi/testharness/TestNiFiInstance.java | 620 +++++++++++++++++++++ .../nifi/testharness/TestNiFiInstanceAware.java | 23 + .../testharness/api/FlowFileEditorCallback.java | 46 ++ .../apache/nifi/testharness/util/FileUtils.java | 88 +++ .../testharness/util/NiFiCoreLibClassLoader.java | 84 +++ .../org/apache/nifi/testharness/util/XmlUtils.java | 67 +++ .../java/org/apache/nifi/testharness/util/Zip.java | 134 +++++ .../apache/nifi/testharness/samples/Constants.java | 32 ++ .../nifi/testharness/samples/NiFiFlowTest.java | 157 ++++++ .../nifi/testharness/samples/NiFiMockFlowTest.java | 119 ++++ .../apache/nifi/testharness/samples/TestUtils.java | 57 ++ .../nifi/testharness/samples/mock/GetHTTPMock.java | 90 +++ .../testharness/samples/mock/MockProcessor.java | 101 ++++ nifi-testharness/src/test/resources/flow.xml | 154 +++++ .../src/test/resources/logback-test.xml | 15 + .../src/test/resources/sample_technology_rss.xml | 24 + pom.xml | 1 + 21 files changed, 2252 insertions(+) diff --git a/nifi-testharness/.gitignore b/nifi-testharness/.gitignore new file mode 100644 index 0000000..17dff51 --- /dev/null +++ b/nifi-testharness/.gitignore @@ -0,0 +1 @@ +nifi_testharness_nifi_home/* \ No newline at end of file diff --git a/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt new file mode 100644 index 0000000..e2d4da0 --- /dev/null +++ b/nifi-testharness/nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt @@ -0,0 +1,3 @@ +This directory is used to mimic NiFi's own home directory: the JVM hosting the +TestNiFiInstance has to be started here. Once started, TestNiFiInstance then +creates symlinks to the actual NiFi installation directory. \ No newline at end of file diff --git a/nifi-testharness/pom.xml b/nifi-testharness/pom.xml new file mode 100644 index 0000000..95cafc0 --- /dev/null +++ b/nifi-testharness/pom.xml @@ -0,0 +1,233 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi</artifactId> + <version>1.10.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-testharness</artifactId> + <description>A test harness for running NiFi flow tests</description> + <packaging>jar</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt</exclude> + <exclude>src/test/resources/sample_technology_rss.xml</exclude> + <exclude>src/test/resources/logback-test.xml</exclude> + <exclude>src/test/resources/flow.xml</exclude> + </excludes> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.20.1</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <workingDirectory>nifi_testharness_nifi_home</workingDirectory> + </configuration> + </plugin> + + </plugins> + </build> + + <profiles> + <profile> + <id>skip-testharness-tests</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <java.awt.headless>true</java.awt.headless> + </systemPropertyVariables> + <excludes> + <exclude>**/samples/*Test.class</exclude> + <exclude>**/samples/Test*.class</exclude> + <exclude>**/samples/*Spec.class</exclude> + </excludes> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <argLine combine.children="append">-Xmx1G + -Djava.net.preferIPv4Stack=true + ${maven.surefire.arguments} + </argLine> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>run-testharness-tests</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <reuseForks>false</reuseForks> + <workingDirectory>${project.basedir}/nifi_testharness_nifi_home</workingDirectory> + <systemPropertyVariables> + <java.awt.headless>true</java.awt.headless> + </systemPropertyVariables> + <includes> + <include>**/*Test.class</include> + <include>**/Test*.class</include> + <include>**/*Spec.class</include> + </includes> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <argLine combine.children="append">-Xmx1G + -Djava.net.preferIPv4Stack=true + ${maven.surefire.arguments} + </argLine> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <org.slf4j.version>1.7.25</org.slf4j.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-runtime</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-assembly</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>pom</type> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-bootstrap</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api --> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <version>3.1.0</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${jetty.version}</version> + <scope>provided</scope> + </dependency> + + + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.4</version> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <version>3.2</version> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-easymock</artifactId> + <version>1.7.1</version> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.7.1</version> + <scope>test</scope> + </dependency> + + </dependencies> + + +</project> diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java new file mode 100644 index 0000000..27e6477 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/SimpleNiFiFlowDefinitionEditor.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.testharness.api.FlowFileEditorCallback; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathFactory; +import java.util.LinkedList; +import java.util.Objects; + + +/** + * <p> + * A facility to describe simple, common changes to a NiFi flow before it is installed to the test + * NiFi instance. Intended to be used by + * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} + * </p> + * + * <p> + * The desired edits can be configured via the {@link Builder} object returned by the {@link #builder()} + * method. Once fully configured, the {@link Builder#build()} emits a {@code FlowFileEditorCallback} + * object that can be passed to + * {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}. + * </p> + * + * <p> + * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong> + * Efforts will be made to retain backwards API compatibility, but + * no guarantee is given. + * </p> + * + * @see TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback) + * + */ + +public final class SimpleNiFiFlowDefinitionEditor implements FlowFileEditorCallback, TestNiFiInstanceAware { + + + private final LinkedList<FlowFileEditorCallback> delegateActions; + private TestNiFiInstance testNiFiInstance; + + private SimpleNiFiFlowDefinitionEditor(LinkedList<FlowFileEditorCallback> delegateActions) { + this.delegateActions = delegateActions; + } + + @Override + public Document edit(Document document) throws Exception { + + for (FlowFileEditorCallback change : delegateActions) { + if (change instanceof TestNiFiInstanceAware) { + ((TestNiFiInstanceAware)change).setTestNiFiInstance(testNiFiInstance); + } + + document = change.edit(document); + } + + return document; + } + + @Override + public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) { + this.testNiFiInstance = Objects.requireNonNull( + testNiFiInstance, "argument testNiFiInstance cannot be null"); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Builder() { + // no external instance + } + + private XPath xpath = XPathFactory.newInstance().newXPath(); + private final LinkedList<FlowFileEditorCallback> actions = new LinkedList<>(); + + public Builder rawXmlChange(FlowFileEditorCallback flowFileEditorCallback) { + actions.addLast(flowFileEditorCallback); + return this; + } + + public Builder setSingleProcessorProperty(String processorName, String propertyName, String newValue) { + + return rawXmlChange(document -> { + String xpathString = "//processor[name/text() = '" + processorName + + "']/property[name/text() = '" + propertyName + "']/value"; + + Node propertyValueNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE); + + if (propertyValueNode == null) { + throw new IllegalArgumentException("Reference to processor '"+ processorName +"' with property '" + + propertyName + "' not found: " + xpathString); + } + + propertyValueNode.setTextContent(newValue); + + return document; + }); + + + } + + public <P extends Processor> Builder setClassOfSingleProcessor(String processorName, Class<P> mockProcessor) { + + return setClassOfSingleProcessor(processorName, mockProcessor.getName()); + } + + public Builder setClassOfSingleProcessor(String processorName, String newFullyQualifiedClassName) { + + return rawXmlChange(document -> { + String xpathString = "//processor[name/text() = '" + processorName + "']/class"; + + Node classNameNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE); + + if (classNameNode == null) { + throw new IllegalArgumentException("Reference to processor '"+ processorName +" not found: " + + xpathString); + } + + classNameNode.setTextContent(newFullyQualifiedClassName); + + return document; + }); + } + + public Builder updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() { + + return rawXmlChange(new UpdateFlowFileNiFiVersionFlowFileEditorCallback()); + } + + + public SimpleNiFiFlowDefinitionEditor build() { + return new SimpleNiFiFlowDefinitionEditor(actions); + } + + } + + + private static final class UpdateFlowFileNiFiVersionFlowFileEditorCallback + implements FlowFileEditorCallback, TestNiFiInstanceAware { + + private TestNiFiInstance testNiFiInstance; + + @Override + public Document edit(Document document) throws Exception { + String niFiVersion = getNiFiVersion(); + + XPath xpath = XPathFactory.newInstance().newXPath(); + + NodeList processorNodeVersionList = (NodeList) + xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version", + document, XPathConstants.NODESET); + + final int length = processorNodeVersionList.getLength(); + for (int i=0; i<length; i++) { + Node processorNodeVersion = processorNodeVersionList.item(i); + + processorNodeVersion.setTextContent(niFiVersion); + } + + return document; + } + + private String getNiFiVersion() { + return Objects.requireNonNull( + testNiFiInstance, "testNiFiInstance cannot be null").getNifiVersion(); + } + + @Override + public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) { + this.testNiFiInstance = Objects.requireNonNull( + testNiFiInstance, "argument testNiFiInstance cannot be null"); + + } + + + } +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java new file mode 100644 index 0000000..e8a1fc6 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstance.java @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness; + +import org.apache.nifi.EmbeddedNiFi; +import org.apache.nifi.testharness.api.FlowFileEditorCallback; +import org.apache.nifi.testharness.util.FileUtils; +import org.apache.nifi.testharness.util.NiFiCoreLibClassLoader; +import org.apache.nifi.testharness.util.XmlUtils; +import org.apache.nifi.testharness.util.Zip; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.zip.ZipEntry; + +/** + * <p> + * An API wrapper of a "test" NiFi instance to which a flow definition is installed for testing.</p> + * + * <p> + * Due to NiFi design restrictions, {@code TestNiFiInstance} has to take <i>full command</i> + * of the current working directory: it installs a full NiFi installation to there. To ensure + * this is desired, <strong>it will only run if the current directory is called + * "nifi_testharness_nifi_home"</strong>. As such the JVM process has to be started inside a directory + * called "nifi_testharness_nifi_home" so that the following is true: + * + * <pre><tt> + * new File(System.getProperty("user.dir")).getName().equals("nifi_testharness_nifi_home") + * </tt></pre> + * </p> + * + * <p> + * Before {@code TestNiFiInstance} can be used, it has to be configured via its builder + * interface: + * <ul> + * <li> + * {@link Builder#setNiFiBinaryDistributionZip(File)} specifies the location of the NiFi binary + * distribution ZIP file to be used. + * </li> + * <li> + * {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi flow + * to install. + * </li> + * <li> + * {@link Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} allows on-the-fly + * changes to be performed to the Flow file before it is actually installed. + * </li> + * </ul> + * + * <h5>Sample</h5> + * <pre><tt> + * TestNiFiInstance testNiFiInstance = TestNiFiInstance.builder() + * .setNiFiBinaryDistributionZip(YourConstants.NIFI_ZIP_FILE) + * .setFlowXmlToInstallForTesting(YourConstants.FLOW_XML_FILE) + * .modifyFlowXmlBeforeInstalling(YourConstants.FLOW_FILE_CHANGES_FOR_TESTS) + * .build(); + * </tt></pre> + * + * </p> + * + * <p> + * If the current working directory is called "nifi_testharness_nifi_home", the caller can + * {@link #install()} this {@code TestNiFiInstance}, which will + * <ol> + * <li> + * (as a first cleanup step) erase all content of the current working directory. + * (NOTE: this potentially destructive operation is the reason why we have the + * "nifi_testharness_nifi_home" directory name guard in place!) + * </li> + * <li> + * Extracts the contents of the NiFi binary distribution ZIP file specified in + * the configuration to a to a temporary directory. + * <li> + * Symlinks all files from the temporary directory to the current working + * directory, causing the directory to hold a fully functional + * NiFi installation. + * </li> + * <li> + * Installs the flow definition files(s) to the NiFi instance specified in + * the configuration. + * </li> + * </ol> + * </p> + * + * <p> + * + * The caller then can proceed to {@link #start()} this {@code TestNiFiInstance}, + * which will bootstrap the NiFi engine, which in turn will pick up and start processing + * the flow definition supplied by the caller in the configuration. + * </p> + * + * <p> + * Once the previous step is done, the caller can perform asserts regarding the observed behaviour + * of the NiFi flow, just like one would do it with standard Java test cases. + * </p> + * + * <p> + * To perform a clean shutdown of the hosted NiFi instance, the caller is required to call + * {@link #stopAndCleanup()}, which will shut down NiFi and remove all temporary files, including + * the symlinks created in the current working directory. + * </p> + * + * + * <h4>NOTES</h4> + * <ul> + * <li> + * {@code TestNiFiInstance} is NOT thread safe: if more than one thread uses it, + * external synchronisation is required. + * </li> + * <li> + * Only one {@code TestNiFiInstance} can be started in the same "nifi_testharness_nifi_home" + * directory at the same time. + * </li> + * <li> + * Currently, due to NiFi limitations, one {@code TestNiFiInstance} can be started per JVM process. + * If multiple test cases are required, launch a new JVM process per test case + * (in sequence, see the point above): Maven/Surefire has built-in support for this. + * </li> + * </ul> + * + * <p> + * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong> + * Efforts will be made to retain backwards API compatibility, but + * no guarantee is given. + * </p> + * + * + * @see TestNiFiInstance#builder() + * + * + */ +public class TestNiFiInstance { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestNiFiInstance.class); + + + private EmbeddedNiFi testNiFi; + + private final File nifiHomeDir; + private final File bootstrapLibDir; + + private File nifiProperties; + + private final File flowXmlGz; + + private final File placeholderNiFiHomeDir; + + private String nifiVersion; + + + private enum State { + STOPPED, + STOP_FAILED, + START_FAILED(STOPPED), + STARTED(STOPPED, STOP_FAILED), + INSTALLATION_FAILED(), + FLOW_INSTALLED(STARTED, START_FAILED), + INSTALLED(FLOW_INSTALLED, INSTALLATION_FAILED), + CREATED(INSTALLED, INSTALLATION_FAILED); + + + private final Set<State> allowedTransitions; + + State(State... allowedTransitions) { + this.allowedTransitions = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedTransitions))); + } + + private void checkCanTransition(State newState) { + if (!this.allowedTransitions.contains(newState)) { + throw new IllegalStateException("Cannot transition from " + this + " to " + newState); + } + } + } + + private State currentState = State.CREATED; + + private final File nifiBinaryZip; + private final File flowXml; + private final FlowFileEditorCallback editCallback; + + private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallback editCallback) { + this.nifiBinaryZip = Objects.requireNonNull(nifiBinaryZip, "nifiBinaryZip"); + this.flowXml = Objects.requireNonNull(flowXml, "flowXml"); + this.editCallback = editCallback; + + nifiHomeDir = requireCurrentWorkingDirectoryIsCorrect(); + + final File configDir = new File(nifiHomeDir, "conf"); + final File libDir = new File(nifiHomeDir, "lib"); + + bootstrapLibDir = new File(libDir, "bootstrap"); + + nifiProperties = new File(configDir, "nifi.properties"); + + flowXmlGz = new File(configDir, "flow.xml.gz"); + + placeholderNiFiHomeDir = requireCurrentWorkingDirectoryIsCorrect(); + } + + String getNifiVersion() { + switch (currentState) { + case INSTALLED: + case FLOW_INSTALLED: + case STARTED: + case START_FAILED: + case STOP_FAILED: + case STOPPED: + + return Objects.requireNonNull(nifiVersion, "nifiVersion is null"); + + default: + throw new IllegalStateException( + "NiFi version can only be retrieved after a successful installation, not in: " + + currentState); + } + } + + public void install() throws IOException { + + currentState.checkCanTransition(State.INSTALLED); + + File[] staleInstallations = placeholderNiFiHomeDir.listFiles((dir, name) -> name.startsWith("nifi-")); + if (staleInstallations != null) { + Arrays.stream(staleInstallations).forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively); + } + + Path tempDirectory = null; + try { + tempDirectory = Files.createTempDirectory("installable-flow"); + + + + LOGGER.info("Uncompressing NiFi archive {} to {} ...", nifiBinaryZip, placeholderNiFiHomeDir); + + Zip.unzipFile(nifiBinaryZip, placeholderNiFiHomeDir, new Zip.StatusListenerAdapter() { + @Override + public void onUncompressDone(ZipEntry ze) { + LOGGER.debug("Uncompressed {}", ze.getName()); + } + }); + + LOGGER.info("Uncompressing DONE"); + + File actualNiFiHomeDir = getActualNiFiHomeDir(placeholderNiFiHomeDir); + + nifiVersion = getNiFiVersion(actualNiFiHomeDir); + + currentState = State.INSTALLED; + + File installableFlowFile = createInstallableFlowFile(tempDirectory); + + validateNiFiVersionAgainstFlowVersion(nifiVersion, installableFlowFile); + + FileUtils.createSymlinks(placeholderNiFiHomeDir, actualNiFiHomeDir); + + installFlowFile(installableFlowFile); + } catch (Exception e) { + + currentState = State.INSTALLATION_FAILED; + + throw new RuntimeException("Installation failed: " + e.getMessage(), e); + + } finally { + if (tempDirectory != null) { + FileUtils.deleteDirectoryRecursive(tempDirectory); + } + } + + currentState = State.FLOW_INSTALLED; + } + + private File createInstallableFlowFile(Path tempDirectory) throws IOException { + + File flowXmlFile = new File(tempDirectory.toFile(), "flow.xml"); + + if (editCallback == null) { + Files.copy(flowXml.toPath(), flowXmlFile.toPath()); + } else { + if (editCallback instanceof TestNiFiInstanceAware) { + ((TestNiFiInstanceAware)editCallback).setTestNiFiInstance(this); + } + + XmlUtils.editXml(flowXml, flowXmlFile, editCallback); + } + + return flowXmlFile; + } + + private void installFlowFile(File fileToIncludeInGz) throws IOException { + Zip.gzipFile(fileToIncludeInGz, flowXmlGz); + } + + private static String getNiFiVersion(File nifiInstallDir) { + + File libDir = new File(nifiInstallDir, "lib"); + if (!libDir.exists()) { + throw new IllegalStateException( + "No \"lib\" directory found in NiFi home directory: " + nifiInstallDir); + } + + File[] nifiApiJarLookupResults = + libDir.listFiles((dir, name) -> name.startsWith("nifi-api-") && name.endsWith(".jar")); + + if (nifiApiJarLookupResults == null) { + // since we check the existence before, this can only be null in case of an I/O error + throw new IllegalStateException( + "I/O error listing NiFi lib directory: " + libDir); + } + + if (nifiApiJarLookupResults.length == 0) { + throw new IllegalStateException( + "No \"\"nifi-api-*.jar\" file found in NiFi lib directory: " + libDir); + } + + if (nifiApiJarLookupResults.length != 1) { + throw new IllegalStateException( + "Multiple \"nifi-api-*.jar\" files found in NiFi lib directory: " + libDir); + } + + File nifiApiJar = nifiApiJarLookupResults[0]; + + + return nifiApiJar.getName() + .replace("nifi-api-", "") + .replace(".jar", ""); + } + + private static void validateNiFiVersionAgainstFlowVersion(String nifiVersion, File flowFile) { + + String flowFileVersion = extractFlowFileVersion(flowFile); + + if (flowFileVersion != null + && !flowFileVersion.equalsIgnoreCase(nifiVersion)) { + + // prevent user errors and fail fast in case we detect that the flow file + // was created by a different version of NiFi. This can prevent a lot of confusion! + + throw new RuntimeException(String.format( + "The NiFi version referenced in the flow file ('%s') does not match the version of NiFi being used ('%s')", + flowFileVersion, nifiVersion)); + } + } + + private static String extractFlowFileVersion(File flowFile) { + + Document flowDocument = XmlUtils.getFileAsDocument(flowFile); + + XPath xpath = XPathFactory.newInstance().newXPath(); + + try { + NodeList processorNodeVersion = (NodeList) + xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version/text()", + flowDocument, XPathConstants.NODESET); + + HashSet<String> versionNumbers = new HashSet<>(); + + final int length = processorNodeVersion.getLength(); + for (int i=0; i<length; i++) { + Node item = processorNodeVersion.item(i); + + String textContent = item.getTextContent(); + + versionNumbers.add(textContent); + } + + if (versionNumbers.size() == 0) { + return null; + } + + if (versionNumbers.size() > 1) { + throw new RuntimeException( + "Multiple NiFi versions found in Flow file, this is unexpected: " + versionNumbers); + } + + return versionNumbers.iterator().next(); + + } catch (XPathExpressionException e) { + throw new RuntimeException("Failure extracting version information from flow file: " + flowFile, e); + } + } + + + public void start() { + + currentState.checkCanTransition(State.STARTED); + + try { + if (!bootstrapLibDir.exists()) { + throw new IllegalStateException("Not found: " + bootstrapLibDir); + } + + + + System.setProperty("org.apache.jasper.compiler.disablejsr199", "true"); + System.setProperty("java.security.egd", "file:/dev/urandom"); + System.setProperty("sun.net.http.allowRestrictedHeaders", "true"); + System.setProperty("java.net.preferIPv4Stack", "true"); + System.setProperty("java.awt.headless", "true"); + System.setProperty("java.protocol.handler.pkgs", "sun.net.www.protocol"); + + System.setProperty("nifi.properties.file.path", nifiProperties.getAbsolutePath()); + System.setProperty("app", "NiFi"); + System.setProperty("org.apache.nifi.bootstrap.config.log.dir", "./logs"); + + ClassLoader coreClassLoader = new NiFiCoreLibClassLoader(nifiHomeDir, ClassLoader.getSystemClassLoader()); + Thread.currentThread().setContextClassLoader(coreClassLoader); + + + + this.testNiFi = new EmbeddedNiFi(new String[0], coreClassLoader); + + } catch (Exception ex) { + + currentState = State.START_FAILED; + + throw new RuntimeException("Startup failed", ex); + + } + + currentState = State.STARTED; + + + } + + + public void stopAndCleanup() { + currentState.checkCanTransition(State.STOPPED); + + try { + testNiFi.shutdown(); + + removeNiFiFilesCreatedForTemporaryInstallation(placeholderNiFiHomeDir); + + } catch (Exception e) { + currentState = State.STOP_FAILED; + + throw new RuntimeException(e); + } + + currentState = State.STOPPED; + } + + private static File requireCurrentWorkingDirectoryIsCorrect() { + + File currentWorkDir = new File(System.getProperty("user.dir")); + if (!currentWorkDir.getName().equals("nifi_testharness_nifi_home")) { + + throw new IllegalStateException( + "The test's working directory has to be set to nifi_testharness_nifi_home, but was: " + currentWorkDir); + } + return currentWorkDir; + } + + private static File getActualNiFiHomeDir(File currentDir) { + File[] files = currentDir.listFiles((dir, name) -> name.startsWith("nifi-")); + + if (files == null || files.length == 0) { + throw new IllegalStateException( + "No \"nifi-*\" directory found in temporary NiFi home directory container: " + currentDir); + } + + if (files.length != 1) { + throw new IllegalStateException( + "Multiple \"nifi-*\" directories found in temporary NiFi home directory container: " + currentDir); + } + + return files[0]; + } + + private static void removeNiFiFilesCreatedForTemporaryInstallation(File directoryToClear) { + + if (directoryToClear != null) { + File[] directoryContents = directoryToClear.listFiles(); + if (directoryContents != null) { + Arrays.stream(directoryContents) + .filter(file -> !"NIFI_TESTHARNESS_README.txt".equals(file.getName())) + .forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively); + } + } + } + + private static void deleteFileOrDirectoryRecursively(File file) { + if (file.isDirectory()) { + FileUtils.deleteDirectoryRecursive(file); + } else { + boolean deletedSuccessfully = file.delete(); + if (!deletedSuccessfully) { + throw new RuntimeException("Could not delete: " + file); + } + } + } + + @Override + public String toString() { + return "NiFi test instance(" + Integer.toHexString(hashCode()) + + ") state: " + currentState + ", home: " + nifiHomeDir; + } + + public static Builder builder() { + return new Builder(); + } + + + public static class Builder { + + private boolean isDisposed = false; + + private File nifiBinaryZip; + private File flowXml; + private FlowFileEditorCallback editCallback; + + /** + * Sets the location of the NiFi binary distribution file, from which the test instance + * will be uncompressed and built. + * + * @param nifiBinaryZip + * the NiFi binary distribution file, from which the test instance will be built (never {@code null}) + * @return {@code this} (for method chaining) + */ + public Builder setNiFiBinaryDistributionZip(File nifiBinaryZip) { + if (!nifiBinaryZip.exists()) { + throw new IllegalArgumentException("File not found: " + nifiBinaryZip); + } + + if (nifiBinaryZip.isDirectory()) { + throw new IllegalArgumentException("A ZIP file is expected to be specified, not a directory: " + + nifiBinaryZip); + } + + this.nifiBinaryZip = nifiBinaryZip; + return this; + } + + /** + * Sets the NiFi flow XML, which will be installed to the NiFi instance for testing. + * + * @param flowXml the NiFi flow file to install to the test instance for testing (never {@code null}) + * + * @return {@code this} (for method chaining) + */ + public Builder setFlowXmlToInstallForTesting(File flowXml) { + if (!flowXml.exists()) { + throw new IllegalArgumentException("File not found: " + flowXml); + } + + this.flowXml = flowXml; + return this; + } + + /** + * <p> + * An <strong>optional</strong> callback to change the flow definition read from + * {@link #setFlowXmlToInstallForTesting(File)}, before it is actually installed for testing. + * (NOTE: The original file remains unchanged: changes are applied to a copy of it.)</p> + * + * <p> + * NOTE: {@link SimpleNiFiFlowDefinitionEditor} provides various common flow definition changes + * useful for testing. + * </p> + * + * @param callback an <strong>optional</strong> callback to change the flow definition + * + * @return {@code this} (for method chaining) + * + * @see SimpleNiFiFlowDefinitionEditor + */ + public Builder modifyFlowXmlBeforeInstalling(FlowFileEditorCallback callback) { + this.editCallback = callback; + return this; + } + + + + public TestNiFiInstance build() { + if (isDisposed) { + throw new IllegalStateException("builder can only be used once"); + } + isDisposed = true; + + return new TestNiFiInstance(nifiBinaryZip, flowXml, editCallback); + } + + + } + + +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java new file mode 100644 index 0000000..31889ed --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/TestNiFiInstanceAware.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.nifi.testharness; + +public interface TestNiFiInstanceAware { + void setTestNiFiInstance(TestNiFiInstance testNiFiInstance); +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java new file mode 100644 index 0000000..dabf361 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/api/FlowFileEditorCallback.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.api; + +import org.w3c.dom.Document; + +/** + * <p> + * An interface that allows programmatic access to the contents of a NiFi Flow XML, + * allowing changes to be performed before it + * is actually installed to the NiFi instance.</p> + * + * <p> + * <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong> + * Efforts will be made to retain backwards API compatibility, but + * no guarantee is given. + * </p> + * + */ +public interface FlowFileEditorCallback { + + /** + * + * @param document the document to change (never {@code null}) + * @return the changed document (never {@code null}) + * @throws Exception in case the editing fails + */ + Document edit(Document document) throws Exception; +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java new file mode 100644 index 0000000..e207f08 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/FileUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Arrays; + +public final class FileUtils { + + + private static final String MAC_DS_STORE_NAME = ".DS_Store"; + + private FileUtils() { + // no instances + } + + public static void deleteDirectoryRecursive(Path directory) throws IOException { + Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + public static void deleteDirectoryRecursive(File dir) { + try { + deleteDirectoryRecursive(dir.toPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void createLink(Path newLink, Path existingFile) { + try { + Files.createSymbolicLink(newLink, existingFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void createSymlinks(File newLinkDir, File existingDir) { + Arrays.stream(existingDir.list()) + .filter(fileName -> !MAC_DS_STORE_NAME.equals(fileName)) + .forEach(fileName -> { + Path newLink = Paths.get(newLinkDir.getAbsolutePath(), fileName); + Path existingFile = Paths.get(existingDir.getAbsolutePath(), fileName); + + File symlinkFile = newLink.toFile(); + if (symlinkFile.exists()) { + symlinkFile.delete(); + } + + createLink(newLink, existingFile); + }); + } +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java new file mode 100644 index 0000000..a3af363 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/NiFiCoreLibClassLoader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.util; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +public final class NiFiCoreLibClassLoader extends URLClassLoader { + + + public NiFiCoreLibClassLoader(File nifiHomeDir, ClassLoader parent) { + super(getURls(nifiHomeDir), parent); + } + + private static URL[] getURls(File nifiHomeDir) { + + try { + File libDir = new File(nifiHomeDir, "lib"); + File bootstrapLibDir = new File(libDir, "bootstrap"); + + + List<URL> libs = Files.list(libDir.toPath()) + .filter(NiFiCoreLibClassLoader::isJarOrNarFile) + .map(NiFiCoreLibClassLoader::toURL) + .collect(Collectors.toList()); + List<URL> bootstrapLibs = Files.list(bootstrapLibDir.toPath()) + .filter(NiFiCoreLibClassLoader::isJarOrNarFile) + .map(NiFiCoreLibClassLoader::toURL) + .collect(Collectors.toList()); + + LinkedList<URL> urls = new LinkedList<>(); + urls.addAll(libs); + urls.addAll(bootstrapLibs); + + return urls.toArray(new URL[urls.size()]); + } catch (IOException ioEx) { + throw new RuntimeException(ioEx); + } + + + } + + private static URL toURL(Path path) { + try { + return path.toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + + } + + private static boolean isJarOrNarFile(Path path) { + String fullPathString = path.getFileName().toString(); + + return path.toFile().isFile() && fullPathString.endsWith(".jar"); + } + + +} diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java new file mode 100644 index 0000000..3e225e7 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/XmlUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.util; + +import org.apache.nifi.testharness.api.FlowFileEditorCallback; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import java.io.File; +import java.io.FileInputStream; + +public final class XmlUtils { + + public static void editXml(File inputFile, File outputFile, FlowFileEditorCallback editCallback) { + + try { + Document document = getFileAsDocument(inputFile); + + document = editCallback.edit(document); + + // save the result + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + transformer.transform(new DOMSource(document), new StreamResult(outputFile)); + + } catch (Exception e) { + throw new RuntimeException("Failed to change XML document: " + e.getMessage(), e); + } + } + + public static Document getFileAsDocument(File xmlFile) { + try(FileInputStream inputStream = new FileInputStream(xmlFile)) { + + DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder(); + + return documentBuilder.parse(new InputSource(inputStream)); + + } catch (Exception e) { + throw new RuntimeException("Failed to parse XML file: " + xmlFile, e); + } + } + +} \ No newline at end of file diff --git a/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java new file mode 100644 index 0000000..12ea403 --- /dev/null +++ b/nifi-testharness/src/main/java/org/apache/nifi/testharness/util/Zip.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.util; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public final class Zip { + + private Zip() { + // no external instances allowed + } + + + public interface StatusListener { + void onUncompressStarted(ZipEntry ze); + + void onUncompressDone(ZipEntry ze); + } + + public static class StatusListenerAdapter implements StatusListener { + + @Override + public void onUncompressStarted(ZipEntry ze) { + + } + + @Override + public void onUncompressDone(ZipEntry ze) { + + } + } + + private static final StatusListener NO_OP_STATUS_LISTENER = new StatusListenerAdapter(); + + public static void unzipFile(File zipFile, File targetDirectory) throws IOException { + unzipFile(zipFile, targetDirectory, NO_OP_STATUS_LISTENER); + + } + + + public static void unzipFile(File zipFile, File targetDirectory, + StatusListener statusListener) throws IOException { + + if (!targetDirectory.exists()) { + boolean mkdirs = targetDirectory.mkdirs(); + if (!mkdirs) { + throw new IOException("Failed to create directory: " + targetDirectory); + } + } + + try (ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile))) { + + ZipEntry ze = zipInputStream.getNextEntry(); + + while (ze != null) { + + if(ze.isDirectory()) { + ze = zipInputStream.getNextEntry(); + continue; + } + + statusListener.onUncompressStarted(ze); + + String fileName = ze.getName(); + File outputFile = new File(targetDirectory, fileName); + + + File parentDir = new File(outputFile.getParent()); + if (!parentDir.exists()) { + boolean couldCreateParentDir = parentDir.mkdirs(); + if (!couldCreateParentDir) { + throw new IllegalStateException("Could not create: " + parentDir); + + } + } + + + + Files.copy(zipInputStream, outputFile.toPath()); + + statusListener.onUncompressDone(ze); + + + ze = zipInputStream.getNextEntry(); + } + + zipInputStream.closeEntry(); + + + } + + + } + + + public static void gzipFile(File inputFile, File gzipFile) throws IOException { + + try (GZIPOutputStream gzos = + new GZIPOutputStream(new FileOutputStream(gzipFile))) { + + + Files.copy(inputFile.toPath(), gzos); + + gzos.finish(); + } + } + + +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java new file mode 100644 index 0000000..83653f7 --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/Constants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.nifi.testharness.samples; + +import java.io.File; + +public final class Constants { + + static final File OUTPUT_DIR = new File("./NiFiTest/NiFiReadTest"); + + // NOTE: you will have to have the NiFi distribution ZIP placed into this directory. + // Its version must be the same as the one referenced in the flow.xml, otherwise it will not work! + static final File NIFI_ZIP_DIR = new File("../../nifi-assembly/target"); + + static final File FLOW_XML_FILE = new File(NiFiMockFlowTest.class.getResource("/flow.xml").getFile()); +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java new file mode 100644 index 0000000..9e13db7 --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiFlowTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.nifi.testharness.samples; + + + +import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor; +import org.apache.nifi.testharness.TestNiFiInstance; +import org.apache.nifi.testharness.util.FileUtils; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.Assert.assertTrue; + + +/** + * This test demonstrates how to mock the source data by starting a mock HTTP server (using Jetty) + * and rewriting the URL in flow definition. + */ +public class NiFiFlowTest { + + private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder() + .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() + .setSingleProcessorProperty("GetHTTP", "URL", "http://localhost:12345") + .build(); + + // used by mocked GetHTTP; serves test data + private static Server testJettyServer; + + private TestNiFiInstance testNiFiInstance; + + + @BeforeClass + public static void beforeClass() throws Exception { + NiFiFlowTest.testJettyServer = new Server(12345); + + + Handler handler = new TestHandler(); + NiFiFlowTest.testJettyServer.setHandler(handler); + NiFiFlowTest.testJettyServer.start(); + } + + + @Before + public void bootstrapNiFi() throws Exception { + + if (Constants.OUTPUT_DIR.exists()) { + FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath()); + } + + File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR); + + TestNiFiInstance testNiFi = TestNiFiInstance.builder() + .setNiFiBinaryDistributionZip(nifiZipFile) + .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE) + .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW) + .build(); + + testNiFi.install(); + testNiFi.start(); + + // only assign testNiFi to the field in case it was started successfully + testNiFiInstance = testNiFi; + } + + @Test + public void testFlowCreatesFilesInCorrectLocation() throws IOException { + + // We deleted the output directory: our NiFi flow should create it + + assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists()); + + File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml"); + + assertTrue("Output file not found: " + outputFile, outputFile.exists()); + + List<String> strings = Files.readAllLines(outputFile.toPath()); + + boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi")); + + assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi); + + boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version")); + + assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion); + + } + + @After + public void shutdownNiFi() { + + if (testNiFiInstance != null) { + testNiFiInstance.stopAndCleanup(); + } + } + + @AfterClass + public static void afterClass() throws Exception { + NiFiFlowTest.testJettyServer.stop(); + } + + + private static class TestHandler extends org.eclipse.jetty.server.handler.AbstractHandler { + @Override + public void handle( + String target, + Request baseRequest, + HttpServletRequest httpServletRequest, + HttpServletResponse response) throws IOException, ServletException { + + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + baseRequest.setHandled(true); + + InputStream resource = TestHandler.class.getResourceAsStream("/sample_technology_rss.xml"); + ServletOutputStream outputStream = response.getOutputStream(); + + byte[] buffer = new byte[1024]; + int len; + while ((len = resource.read(buffer)) != -1) { + outputStream.write(buffer, 0, len); + } + } + } +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java new file mode 100644 index 0000000..c5a7139 --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/NiFiMockFlowTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.nifi.testharness.samples; + + +import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor; +import org.apache.nifi.testharness.TestNiFiInstance; +import org.apache.nifi.testharness.samples.mock.GetHTTPMock; +import org.apache.nifi.testharness.util.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.Assert.assertTrue; + + +/** + * This test demonstrates how to mock the source data by mocking the processor + * itself in the flow definition. + */ +public class NiFiMockFlowTest { + + private static final InputStream DEMO_DATA_AS_STREAM = + NiFiMockFlowTest.class.getResourceAsStream("/sample_technology_rss.xml"); + + + // We have a dedicated class. It has to be public static + // so that NiFi engine can instantiate it. + public static class MockedGetHTTP extends GetHTTPMock { + + public MockedGetHTTP() { + super("text/xml; charset=utf-8", () -> DEMO_DATA_AS_STREAM); + } + } + + + private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder() + .updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() + .setClassOfSingleProcessor("GetHTTP", MockedGetHTTP.class) + .build(); + + + private TestNiFiInstance testNiFiInstance; + + @Before + public void bootstrapNiFi() throws Exception { + + if (Constants.OUTPUT_DIR.exists()) { + FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath()); + } + + File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR); + + TestNiFiInstance testNiFi = TestNiFiInstance.builder() + .setNiFiBinaryDistributionZip(nifiZipFile) + .setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE) + .modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW) + .build(); + + testNiFi.install(); + testNiFi.start(); + + // only assign testNiFi to the field in case it was started successfully + testNiFiInstance = testNiFi; + } + + @Test + public void testFlowCreatesFilesInCorrectLocation() throws IOException { + + // We deleted the output directory: our NiFi flow should create it + + assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists()); + + File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml"); + + assertTrue("Output file not found: " + outputFile, outputFile.exists()); + + List<String> strings = Files.readAllLines(outputFile.toPath()); + + boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi")); + + assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi); + + boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version")); + + assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion); + + } + + @After + public void shutdownNiFi() { + + if (testNiFiInstance != null) { + testNiFiInstance.stopAndCleanup(); + } + } +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java new file mode 100644 index 0000000..7d0b633 --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/TestUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.nifi.testharness.samples; + +import java.io.File; + +final class TestUtils { + + private TestUtils() { + // no instances allowed + } + + static File getBinaryDistributionZipFile(File binaryDistributionZipDir) { + + if (!binaryDistributionZipDir.exists()) { + throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: " + + binaryDistributionZipDir.getAbsolutePath()); + } + + File[] files = binaryDistributionZipDir.listFiles((dir, name) -> + name.startsWith("nifi-") && name.endsWith("-bin.zip")); + + if (files == null) { + throw new IllegalStateException( + "Not a directory or I/O error reading: " + binaryDistributionZipDir.getAbsolutePath()); + } + + if (files.length == 0) { + throw new IllegalStateException( + "No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir.getAbsolutePath()); + } + + if (files.length > 1) { + throw new IllegalStateException( + "Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir.getAbsolutePath()); + } + + return files[0]; + } +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java new file mode 100644 index 0000000..67055e4 --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/GetHTTPMock.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.nifi.testharness.samples.mock; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class GetHTTPMock extends MockProcessor { + + private final String contentType; + private final Supplier<InputStream> inputStreamSupplier; + + public GetHTTPMock(String contentType, Supplier<InputStream> inputStreamSupplier) { + super("org.apache.nifi.processors.standard.GetHTTP"); + + this.contentType = contentType; + this.inputStreamSupplier = inputStreamSupplier; + } + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All files are transferred to the success relationship") + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory processSessionFactory) { + + final ComponentLog logger = getLogger(); + + final StopWatch stopWatch = new StopWatch(true); + + final ProcessSession session = processSessionFactory.createSession(); + + final String url = context.getProperty("URL").evaluateAttributeExpressions().getValue(); + final URI uri; + String source = url; + try { + uri = new URI(url); + source = uri.getHost(); + } catch (final URISyntaxException swallow) { + // this won't happen as the url has already been validated + } + + FlowFile flowFile = session.create(); + + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty("Filename").evaluateAttributeExpressions().getValue()); + flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source); + flowFile = session.importFrom(inputStreamSupplier.get(), flowFile); + + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), contentType); + + final long flowFileSize = flowFile.getSize(); + stopWatch.stop(); + session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + final String dataRate = stopWatch.calculateDataRate(flowFileSize); + logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate}); + session.commit(); + + } +} diff --git a/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java new file mode 100644 index 0000000..cd62b2d --- /dev/null +++ b/nifi-testharness/src/test/java/org/apache/nifi/testharness/samples/mock/MockProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.testharness.samples.mock; + + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +public abstract class MockProcessor implements Processor { + + private final Processor delegate; + private ComponentLog logger; + + protected MockProcessor(String delegateClassName) { + try { + + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + final Class<?> delegateClass = Class.forName(delegateClassName, true, contextClassLoader); + + delegate = (Processor) delegateClass.newInstance(); + } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { + throw new RuntimeException(e); + } + + + } + + protected Processor getDelegate() { + return delegate; + + } + + protected final ComponentLog getLogger() { + return logger; + } + + @Override + public void initialize(ProcessorInitializationContext processorInitializationContext) { + getDelegate().initialize(processorInitializationContext); + logger = processorInitializationContext.getLogger(); + } + + @Override + public Set<Relationship> getRelationships() { + return getDelegate().getRelationships(); + } + + @Override + public abstract void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory); + + @Override + public Collection<ValidationResult> validate(ValidationContext validationContext) { + return getDelegate().validate(validationContext); + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String s) { + return getDelegate().getPropertyDescriptor(s); + } + + @Override + public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) { + getDelegate().onPropertyModified(propertyDescriptor, s, s1); + } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { + return getDelegate().getPropertyDescriptors(); + } + + @Override + public String getIdentifier() { + return getDelegate().getIdentifier(); + } +} diff --git a/nifi-testharness/src/test/resources/flow.xml b/nifi-testharness/src/test/resources/flow.xml new file mode 100644 index 0000000..66a1cf2 --- /dev/null +++ b/nifi-testharness/src/test/resources/flow.xml @@ -0,0 +1,154 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController encoding-version="1.3"> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <registries/> + <rootGroup> + <id>92b74849-0166-1000-28d3-4da912e34551</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>92b9139c-0166-1000-04d5-1184adc0977a</id> + <name>PutFile</name> + <position x="632.0" y="98.0"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.PutFile</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.7.1</version> + </bundle> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <executionNode>ALL</executionNode> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Directory</name> + <value>./NiFiTest/NiFiReadTest</value> + </property> + <property> + <name>Conflict Resolution Strategy</name> + <value>ignore</value> + </property> + <property> + <name>Create Missing Directories</name> + <value>true</value> + </property> + <property> + <name>Maximum File Count</name> + </property> + <property> + <name>Last Modified Time</name> + </property> + <property> + <name>Permissions</name> + </property> + <property> + <name>Owner</name> + </property> + <property> + <name>Group</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + <autoTerminatedRelationship>failure</autoTerminatedRelationship> + </processor> + <processor> + <id>92b87553-0166-1000-527e-7ecdc888d91a</id> + <name>GetHTTP</name> + <position x="238.0" y="98.0"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.GetHTTP</class> + <bundle> + <group>org.apache.nifi</group> + <artifact>nifi-standard-nar</artifact> + <version>1.7.1</version> + </bundle> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <executionNode>ALL</executionNode> + <runDurationNanos>0</runDurationNanos> + <property> + <name>URL</name> + <value>http://feeds.bbci.co.uk/news/technology/rss.xml?edition=uk#</value> + </property> + <property> + <name>Filename</name> + <value>bbc-world.rss.xml</value> + </property> + <property> + <name>SSL Context Service</name> + </property> + <property> + <name>Username</name> + </property> + <property> + <name>Password</name> + </property> + <property> + <name>Connection Timeout</name> + <value>30 sec</value> + </property> + <property> + <name>Data Timeout</name> + <value>30 sec</value> + </property> + <property> + <name>User Agent</name> + </property> + <property> + <name>Accept Content-Type</name> + </property> + <property> + <name>Follow Redirects</name> + <value>false</value> + </property> + <property> + <name>redirect-cookie-policy</name> + <value>default</value> + </property> + <property> + <name>proxy-configuration-service</name> + </property> + <property> + <name>Proxy Host</name> + </property> + <property> + <name>Proxy Port</name> + </property> + </processor> + <connection> + <id>92b9380b-0166-1000-981d-c9e319f135e3</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>92b87553-0166-1000-527e-7ecdc888d91a</sourceId> + <sourceGroupId>92b74849-0166-1000-28d3-4da912e34551</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>92b9139c-0166-1000-04d5-1184adc0977a</destinationId> + <destinationGroupId>92b74849-0166-1000-28d3-4da912e34551</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>10000</maxWorkQueueSize> + <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> diff --git a/nifi-testharness/src/test/resources/logback-test.xml b/nifi-testharness/src/test/resources/logback-test.xml new file mode 100644 index 0000000..ab903af --- /dev/null +++ b/nifi-testharness/src/test/resources/logback-test.xml @@ -0,0 +1,15 @@ +<configuration debug="true"> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are by default assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> \ No newline at end of file diff --git a/nifi-testharness/src/test/resources/sample_technology_rss.xml b/nifi-testharness/src/test/resources/sample_technology_rss.xml new file mode 100644 index 0000000..a95ba96 --- /dev/null +++ b/nifi-testharness/src/test/resources/sample_technology_rss.xml @@ -0,0 +1,24 @@ + +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet title="XSL_formatting" type="text/xsl" href="/shared/bsp/xsl/rss/nolsol.xsl"?> +<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:media="http://search.yahoo.com/mrss/"> + <channel> + <title>Sample Technology feed</title> + <description>Sample Technology feed</description> + <image> + <url>https://nifi.apache.org/assets/images/apache-nifi-logo.svg</url> + <title>NiFi sample</title> + <link>https://nifi.apache.org/</link> + </image> + <language>en-gb</language> + <ttl>15</ttl> + <item> + <title>The latest NiFi version is out</title> + <description>The latest version of NiFi is released</description> + <link>https://nifi.apache.org/</link> + <guid isPermaLink="true">https://nifi.apache.org/</guid> + <pubDate>Sat, 24 Sep 2018 17:10:10 GMT</pubDate> + <media:thumbnail width="1024" height="576" url="https://nifi.apache.org/assets/images/apache-nifi-logo.svg"/> + </item> + </channel> +</rss> \ No newline at end of file diff --git a/pom.xml b/pom.xml index b6cf513..8037976 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ <module>nifi-mock</module> <module>nifi-nar-bundles</module> <module>nifi-assembly</module> + <module>nifi-testharness</module> <module>nifi-docs</module> <module>nifi-maven-archetypes</module> <module>nifi-external</module>