NIFI-589: Updated with first review feedback * Switched to using `getResourceAsStream()` where possible * Removed trailing whitespace from added files * Added missing license headers * Added RAT exception to testdata files * Fixed POM errors that broke the build * Switched to using TemporaryFolder instead of putting files in `target` * Used try-with-resources where needed to autoclose streams * Moved logging configuration to properties files * Removed AbstractFlumeTest * Fixed logging levels in test code
Signed-off-by: Matt Gilman <matt.c.gil...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3529bb33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3529bb33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3529bb33 Branch: refs/heads/develop Commit: 3529bb33178ce70a33cc7ab21d7334ba3411048b Parents: 3b9e482 Author: Joey Echeverria <joe...@gmail.com> Authored: Fri Jun 5 14:00:37 2015 -0700 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Tue Jul 14 14:50:16 2015 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/util/file/FileUtils.java | 27 +++- .../nifi-flume-processors/pom.xml | 127 +++++++++++-------- .../nifi/processors/flume/NifiChannel.java | 18 ++- .../processors/flume/NifiChannelSelector.java | 18 ++- .../nifi/processors/flume/NifiTransaction.java | 17 ++- .../processors/flume/util/FlowFileEvent.java | 21 ++- .../flume/util/FlowFileEventConstants.java | 17 ++- .../processors/flume/AbstractFlumeTest.java | 35 ----- .../flume/FlumeSinkProcessorTest.java | 59 +++++---- .../flume/FlumeSourceProcessorTest.java | 25 ++-- .../src/test/resources/core-site.xml | 5 + .../src/test/resources/log4j.properties | 20 +++ .../src/test/resources/simplelogger.properties | 20 +++ nifi/pom.xml | 1 + 14 files changed, 261 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java index 7661e2d..ff4da8e 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java @@ -560,6 +560,19 @@ public class FileUtils { * @throws IOException if the MD5 hash could not be computed */ public static byte[] computeMd5Digest(final File file) throws IOException { + try (final FileInputStream fis = new FileInputStream(file)) { + return computeMd5Digest(fis); + } + } + + /** + * Returns the MD5 hash of the given stream. + * + * @param stream an input stream + * @return the MD5 hash + * @throws IOException if the MD5 hash could not be computed + */ + public static byte[] computeMd5Digest(final InputStream stream) throws IOException { final MessageDigest digest; try { digest = MessageDigest.getInstance("MD5"); @@ -567,15 +580,15 @@ public class FileUtils { throw new IOException(nsae); } - try (final FileInputStream fis = new FileInputStream(file)) { - int len; - final byte[] buffer = new byte[8192]; - while ((len = fis.read(buffer)) > -1) { - if (len > 0) { - digest.update(buffer, 0, len); - } + + int len; + final byte[] buffer = new byte[8192]; + while ((len = stream.read(buffer)) > -1) { + if (len > 0) { + digest.update(buffer, 0, len); } } + return digest.digest(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml index bd26a99..b903f21 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -38,14 +38,14 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-flowfile-packager</artifactId> </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> <version>1.5.2</version> <exclusions> <exclusion> @@ -53,74 +53,89 @@ <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> - </dependency> + </dependency> <!-- Flume Sources --> - <dependency> - <groupId>org.apache.flume.flume-ng-sources</groupId> - <artifactId>flume-twitter-source</artifactId> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-twitter-source</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume.flume-ng-sources</groupId> - <artifactId>flume-jms-source</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-jms-source</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume.flume-ng-sources</groupId> - <artifactId>flume-scribe-source</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sources</groupId> + <artifactId>flume-scribe-source</artifactId> <version>1.5.2</version> - </dependency> + </dependency> <!-- Flume Sinks --> - <dependency> - <groupId>org.apache.flume.flume-ng-sinks</groupId> - <artifactId>flume-hdfs-sink</artifactId> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-hdfs-sink</artifactId> <version>1.5.2</version> - </dependency> + </dependency> <!-- HDFS sink dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flume.flume-ng-sinks</groupId> - <artifactId>flume-irc-sink</artifactId> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-irc-sink</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume.flume-ng-sinks</groupId> - <artifactId>flume-ng-elasticsearch-sink</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-elasticsearch-sink</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume.flume-ng-sinks</groupId> - <artifactId>flume-ng-hbase-sink</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-hbase-sink</artifactId> <version>1.5.2</version> - </dependency> - <dependency> - <groupId>org.apache.flume.flume-ng-sinks</groupId> - <artifactId>flume-ng-morphline-solr-sink</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume.flume-ng-sinks</groupId> + <artifactId>flume-ng-morphline-solr-sink</artifactId> <version>1.5.2</version> - </dependency> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - <scope>test</scope> - </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/testdata/*</exclude> <!-- test data --> + </excludes> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java index ac8dbe2..c4d3bef 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.flume; import org.apache.flume.Context; @@ -24,7 +39,6 @@ public class NifiChannel extends BasicChannelSemantics { @Override public void configure(Context context) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java index 792678b..2b0ba77 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.flume; import com.google.common.collect.ImmutableList; @@ -36,7 +51,6 @@ public class NifiChannelSelector implements ChannelSelector { @Override public void setChannels(List<Channel> channels) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java index 3d6a647..37c8a50 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.flume; import org.apache.flume.Event; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java index c3531ca..5dc97d6 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.flume.util; import com.google.common.collect.Maps; @@ -56,7 +71,7 @@ public class FlowFileEvent implements Event { } headers.put(LINEAGE_START_DATE_HEADER, Long.toString(flowFile.getLineageStartDate())); headers.put(SIZE_HEADER, Long.toString(flowFile.getSize())); - + headersLoaded = true; } } @@ -83,7 +98,7 @@ public class FlowFileEvent implements Event { if (flowFile.getSize() > Integer.MAX_VALUE) { throw new RuntimeException("Can't get body of Event because the backing FlowFile is too large (" + flowFile.getSize() + " bytes)"); } - + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize()); session.read(flowFile, new InputStreamCallback() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java index c13f0ef..c9650c1 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java @@ -1,4 +1,19 @@ - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.flume.util; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java deleted file mode 100644 index 87b056a..0000000 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.flume; - -import org.junit.BeforeClass; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AbstractFlumeTest { - - private static Logger logger; - - @BeforeClass - public static void setUpClass() { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.flume", "debug"); - logger = LoggerFactory.getLogger(AbstractFlumeTest.class); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java index d22514f..2e10c24 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java @@ -19,9 +19,9 @@ package org.apache.nifi.processors.flume; import java.io.File; import static org.junit.Assert.assertEquals; -import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -39,15 +39,20 @@ import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlumeSinkProcessorTest { - private static final Logger logger = - LoggerFactory.getLogger(FlumeSinkProcessorTest.class); - + private static final Logger logger = + LoggerFactory.getLogger(FlumeSinkProcessorTest.class); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + @Test public void testValidators() { TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); @@ -62,7 +67,7 @@ public class FlumeSinkProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required")); } @@ -76,7 +81,7 @@ public class FlumeSinkProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink")); } @@ -90,7 +95,7 @@ public class FlumeSinkProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink")); } @@ -109,12 +114,12 @@ public class FlumeSinkProcessorTest { public void testNullSink() throws IOException { TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); - FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt"); - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); - runner.enqueue(fis, attributes); - runner.run(); - fis.close(); + try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(inputStream, attributes); + runner.run(); + } } @Test @@ -129,15 +134,10 @@ public class FlumeSinkProcessorTest { } runner.run(); } - + @Test public void testHdfsSink() throws IOException { - File destDir = new File("target/hdfs"); - if (destDir.exists()) { - FileUtils.deleteFilesInDir(destDir, null, logger); - } else { - destDir.mkdirs(); - } + File destDir = temp.newFolder("hdfs"); TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs"); @@ -147,19 +147,22 @@ public class FlumeSinkProcessorTest { "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" + "tier1.sinks.sink-1.serializer.appendNewline = false" ); - FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt"); - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); - runner.enqueue(fis, attributes); - runner.run(); - fis.close(); + try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); + runner.enqueue(inputStream, attributes); + runner.run(); + } File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE); assertEquals("Unexpected number of destination files.", 1, files.length); File dst = files[0]; - byte[] expectedMd5 = FileUtils.computeMd5Digest(new File("src/test/resources/testdata/records.txt")); + byte[] expectedMd5; + try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) { + expectedMd5 = FileUtils.computeMd5Digest(md5Stream); + } byte[] actualMd5 = FileUtils.computeMd5Digest(dst); Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5); } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java index bbcf116..043e115 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java @@ -34,15 +34,18 @@ import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlumeSourceProcessorTest { - private static final Logger logger = - LoggerFactory.getLogger(FlumeSourceProcessorTest.class); + private static final Logger logger = LoggerFactory.getLogger(FlumeSourceProcessorTest.class); + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); @Test public void testValidators() { @@ -58,7 +61,7 @@ public class FlumeSourceProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required")); } @@ -72,7 +75,7 @@ public class FlumeSourceProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because unable to load source")); } @@ -86,7 +89,7 @@ public class FlumeSourceProcessorTest { } Assert.assertEquals(1, results.size()); for (ValidationResult vr : results) { - logger.error(vr.toString()); + logger.debug(vr.toString()); Assert.assertTrue(vr.toString().contains("is invalid because unable to create source")); } @@ -108,22 +111,16 @@ public class FlumeSourceProcessorTest { List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); Assert.assertEquals(1, flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - logger.error(flowFile.toString()); + logger.debug(flowFile.toString()); Assert.assertEquals(1, flowFile.getSize()); } } @Test public void testSourceWithConfig() throws IOException { - File spoolDirectory = new File("target/spooldir"); - if (spoolDirectory.exists()) { - FileUtils.deleteFilesInDir(spoolDirectory, null, logger); - } else { - spoolDirectory.mkdirs(); - } - File src = new File("src/test/resources/testdata/records.txt"); + File spoolDirectory = temp.newFolder("spooldir"); File dst = new File(spoolDirectory, "records.txt"); - FileUtils.copyFile(src, dst, false, false, logger); + FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false); TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml index 5e3b55c..849854b 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml @@ -20,6 +20,11 @@ <configuration> <property> <name>fs.defaultFS</name> + <!-- + Hadoop doesn't support a chroot style operation for the + local filesystem so there's no benefit to setting this + to a directory other than '/' + --> <value>file:///</value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000..8c502ec --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties new file mode 100644 index 0000000..4994e7f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS +org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.apache.nifi.processors.flume=debug http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 682a426..3a5c6b7 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -805,6 +805,7 @@ <artifactId>nifi-geo-nar</artifactId> <version>0.2.0-incubating-SNAPSHOT</version> <type>nar</type> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-nar</artifactId>