http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java deleted file mode 100644 index 0654138..0000000 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.flume; - -import java.io.File; -import static org.junit.Assert.assertEquals; - -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import org.apache.commons.io.filefilter.HiddenFileFilter; -import org.apache.flume.sink.NullSink; -import org.apache.flume.source.AvroSource; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.util.file.FileUtils; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlumeSinkProcessorTest { - - private static final Logger logger = - LoggerFactory.getLogger(FlumeSinkProcessorTest.class); - - @Rule - public final TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - Collection<ValidationResult> results; - ProcessContext pc; - - results = new HashSet<>(); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required")); - } - - // non-existent class - results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name"); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink")); - } - - // class doesn't implement Sink - results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName()); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink")); - } - - results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(0, results.size()); - } - - - @Test - public void testNullSink() throws IOException { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); - try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); - runner.enqueue(inputStream, attributes); - runner.run(); - } - } - - @Test - public void testBatchSize() throws IOException { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, - "tier1.sinks.sink-1.batchSize = 1000\n"); - for (int i = 0; i < 100000; i++) { - runner.enqueue(String.valueOf(i).getBytes()); - } - runner.run(100); - } - - @Test - public void testHdfsSink() throws IOException { - File destDir = temp.newFolder("hdfs"); - - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs"); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, - "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" + - "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" + - "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" + - "tier1.sinks.sink-1.serializer.appendNewline = false" - ); - try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); - runner.enqueue(inputStream, attributes); - runner.run(); - } - - File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE); - assertEquals("Unexpected number of destination files.", 1, files.length); - File dst = files[0]; - byte[] expectedMd5; - try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) { - expectedMd5 = FileUtils.computeMd5Digest(md5Stream); - } - byte[] actualMd5 = FileUtils.computeMd5Digest(dst); - Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java deleted file mode 100644 index bf32095..0000000 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.flume; - - -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.flume.sink.NullSink; -import org.apache.flume.source.AvroSource; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.util.file.FileUtils; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlumeSourceProcessorTest { - - private static final Logger logger = LoggerFactory.getLogger(FlumeSourceProcessorTest.class); - - @Rule - public final TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); - Collection<ValidationResult> results; - ProcessContext pc; - - results = new HashSet<>(); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required")); - } - - // non-existent class - results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name"); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because unable to load source")); - } - - // class doesn't implement Source - results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName()); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - logger.debug(vr.toString()); - Assert.assertTrue(vr.toString().contains("is invalid because unable to create source")); - } - - results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName()); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(0, results.size()); - } - - @Test - public void testSequenceSource() { - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq"); - runner.run(); - List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); - Assert.assertEquals(1, flowFiles.size()); - for (MockFlowFile flowFile : flowFiles) { - logger.debug(flowFile.toString()); - Assert.assertEquals(1, flowFile.getSize()); - } - } - - @Test - public void testSourceWithConfig() throws IOException { - File spoolDirectory = temp.newFolder("spooldir"); - File dst = new File(spoolDirectory, "records.txt"); - FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false); - - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, - "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); - runner.run(1, false, true); - // Because the spool directory source is an event driven source, it may take some time for flow files to get - // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than - // that then there is likely a bug. - int numWaits = 10; - while (runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS).size() < 4 && --numWaits > 0) { - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException ex) { - logger.warn("Sleep interrupted"); - } - } - runner.shutdown(); - runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 4); - int i = 1; - for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS)) { - flowFile.assertContentEquals("record " + i); - i++; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml index a2742aa..59aab3c 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml @@ -32,7 +32,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-processors</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> + <version>0.2.0-incubating-SNAPSHOT</version> </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/286e4738/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 3a5c6b7..66a328c 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -809,7 +809,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-nar</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> + <version>0.2.0-incubating-SNAPSHOT</version> <type>nar</type> </dependency> <dependency> @@ -864,10 +864,10 @@ <version>2.0.0</version> </dependency> <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.11.1.1</version> - </dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.11.1.1</version> + </dependency> </dependencies> </dependencyManagement> <dependencies>