Repository: incubator-nifi Updated Branches: refs/heads/develop a4d21e3cd -> 0af9c75d7
NIFI-626 - ExecuteSQL processor for executing arbitrary SQL queries, initial implementation. Signed-off-by: Toivo Adams <toivo.ad...@gmail.com> Signed-off-by: Mark Payne <marka...@hotmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/13addeb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/13addeb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/13addeb5 Branch: refs/heads/develop Commit: 13addeb571746f8ace93928ab423af3ed1a3bc3c Parents: 69f04cb Author: Toivo Adams <toivo.ad...@gmail.com> Authored: Mon May 25 15:13:57 2015 +0300 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Jun 17 12:35:53 2015 -0400 ---------------------------------------------------------------------- .../nifi-standard-processors/pom.xml | 545 ++++++++++--------- .../nifi/processors/standard/ExecuteSQL.java | 154 ++++++ .../processors/standard/util/JdbcCommon.java | 146 +++++ .../processors/standard/TestExecuteSQL.java | 129 +++++ .../standard/util/TestJdbcCommon.java | 153 ++++++ .../standard/util/TestJdbcHugeStream.java | 264 +++++++++ nifi/pom.xml | 5 + 7 files changed, 1126 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 8cce56d..3f83160 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -1,272 +1,277 @@ <?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-bundle</artifactId> - <version>0.2.0-incubating-SNAPSHOT</version> - </parent> - <artifactId>nifi-standard-processors</artifactId> - <packaging>jar</packaging> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-processor-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-flowfile-packager</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-distributed-cache-client-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-http-context-map-api</artifactId> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </dependency> - <dependency> - <groupId>commons-net</groupId> - <artifactId>commons-net</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk16</artifactId> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpg-jdk16</artifactId> - </dependency> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security-utils</artifactId> - </dependency> - <dependency> - <groupId>com.jcraft</groupId> - <artifactId>jsch</artifactId> - </dependency> - <dependency> - <groupId>com.jcraft</groupId> - <artifactId>jzlib</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - <dependency> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </dependency> - <dependency> - <groupId>com.github.jponge</groupId> - <artifactId>lzma-java</artifactId> - </dependency> - <dependency> - <groupId>org.tukaani</groupId> - <artifactId>xz</artifactId> - </dependency> - <dependency> - <groupId>net.sf.saxon</groupId> - <artifactId>Saxon-HE</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-socket-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-load-distribution-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-distributed-cache-client-service</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - <dependency> - <groupId>javax.jms</groupId> - <artifactId>javax.jms-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-client</artifactId> - </dependency> - <dependency> - <groupId>com.jayway.jsonpath</groupId> - <artifactId>json-path</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.tika</groupId> - <artifactId>tika-core</artifactId> - <version>1.7</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>2.4.5</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludes combine.children="append"> - <exclude>src/test/resources/localhost.cer</exclude> - <exclude>src/test/resources/hello.txt</exclude> - <exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude> - <exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt</exclude> - <exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude> - <exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude> - <exclude>src/test/resources/ExecuteCommand/test.txt</exclude> - <exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude> - <exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude> - <exclude>src/test/resources/ScanAttribute/dictionary1</exclude> - <exclude>src/test/resources/TestEncryptContent/text.txt</exclude> - <exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude> - <exclude>src/test/resources/TestJson/json-sample.json</exclude> - <exclude>src/test/resources/TestJson/control-characters.json</exclude> - <exclude>src/test/resources/TestMergeContent/demarcate</exclude> - <exclude>src/test/resources/TestMergeContent/foot</exclude> - <exclude>src/test/resources/TestMergeContent/head</exclude> - <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude> - <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude> - <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude> - <exclude>src/test/resources/TestModifyBytes/testFile.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/$1$1.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/Good.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/Spider.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/[DODO].txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/cu_Po.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/food.txt</exclude> - <exclude>src/test/resources/TestReplaceTextLineByLine/testFile.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt</exclude> - <exclude>src/test/resources/TestReplaceTextWithMapping/colors.txt</exclude> - <exclude>src/test/resources/TestScanContent/helloWorld</exclude> - <exclude>src/test/resources/TestScanContent/wellthengood-bye</exclude> - <exclude>src/test/resources/TestSplitText/1.txt</exclude> - <exclude>src/test/resources/TestSplitText/2.txt</exclude> - <exclude>src/test/resources/TestSplitText/3.txt</exclude> - <exclude>src/test/resources/TestSplitText/4.txt</exclude> - <exclude>src/test/resources/TestSplitText/5.txt</exclude> - <exclude>src/test/resources/TestSplitText/6.txt</exclude> - <exclude>src/test/resources/TestSplitText/original.txt</exclude> - <exclude>src/test/resources/TestTransformXml/math.html</exclude> - <exclude>src/test/resources/TestTransformXml/tokens.csv</exclude> - <exclude>src/test/resources/TestTransformXml/tokens.xml</exclude> - <exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude> - <exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude> - <exclude>src/test/resources/TestUnpackContent/data.flowfilev2</exclude> - <exclude>src/test/resources/TestUnpackContent/data.flowfilev3</exclude> - <exclude>src/test/resources/TestXml/xml-bundle-1</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude> - <exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude> - <exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude> - <exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude> - <exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude> - <exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude> - <exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude> - <exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.txt.bz2</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.txt.gz</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/1.zip</exclude> - <exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude> - <exclude>src/test/resources/TestUnpackContent/data.tar</exclude> - <exclude>src/test/resources/TestUnpackContent/data.zip</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-bundle</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + </parent> + <artifactId>nifi-standard-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flowfile-packager</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-http-context-map-api</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk16</artifactId> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpg-jdk16</artifactId> + </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security-utils</artifactId> + </dependency> + <dependency> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </dependency> + <dependency> + <groupId>com.jcraft</groupId> + <artifactId>jzlib</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + </dependency> + <dependency> + <groupId>com.github.jponge</groupId> + <artifactId>lzma-java</artifactId> + </dependency> + <dependency> + <groupId>org.tukaani</groupId> + <artifactId>xz</artifactId> + </dependency> + <dependency> + <groupId>net.sf.saxon</groupId> + <artifactId>Saxon-HE</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-socket-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-load-distribution-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>javax.jms-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-core</artifactId> + <version>1.7</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.4.5</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-dbcp-service-api</artifactId> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/localhost.cer</exclude> + <exclude>src/test/resources/hello.txt</exclude> + <exclude>src/test/resources/CharacterSetConversionSamples/Converted.txt</exclude> + <exclude>src/test/resources/CharacterSetConversionSamples/Original.txt</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt</exclude> + <exclude>src/test/resources/CompressedData/SampleFileConcat.txt</exclude> + <exclude>src/test/resources/ExecuteCommand/1000bytes.txt</exclude> + <exclude>src/test/resources/ExecuteCommand/test.txt</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary-with-empty-new-lines</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary-with-extra-info</exclude> + <exclude>src/test/resources/ScanAttribute/dictionary1</exclude> + <exclude>src/test/resources/TestEncryptContent/text.txt</exclude> + <exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude> + <exclude>src/test/resources/TestJson/json-sample.json</exclude> + <exclude>src/test/resources/TestJson/control-characters.json</exclude> + <exclude>src/test/resources/TestMergeContent/demarcate</exclude> + <exclude>src/test/resources/TestMergeContent/foot</exclude> + <exclude>src/test/resources/TestMergeContent/head</exclude> + <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude> + <exclude>src/test/resources/TestModifyBytes/testFile.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/$1$1.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/BRue_cRue_RiRey.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Blu$2e_clu$2e.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/D$d_h$d.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Good.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/Spider.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/[DODO].txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/cu[$1]_Po[$1].txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/cu_Po.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/food.txt</exclude> + <exclude>src/test/resources/TestReplaceTextLineByLine/testFile.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-blank-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-escaped-dollar-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping-simple.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-excessive-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-invalid-backreference-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-no-match-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/color-fruit-space-mapping.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/colors-without-dashes.txt</exclude> + <exclude>src/test/resources/TestReplaceTextWithMapping/colors.txt</exclude> + <exclude>src/test/resources/TestScanContent/helloWorld</exclude> + <exclude>src/test/resources/TestScanContent/wellthengood-bye</exclude> + <exclude>src/test/resources/TestSplitText/1.txt</exclude> + <exclude>src/test/resources/TestSplitText/2.txt</exclude> + <exclude>src/test/resources/TestSplitText/3.txt</exclude> + <exclude>src/test/resources/TestSplitText/4.txt</exclude> + <exclude>src/test/resources/TestSplitText/5.txt</exclude> + <exclude>src/test/resources/TestSplitText/6.txt</exclude> + <exclude>src/test/resources/TestSplitText/original.txt</exclude> + <exclude>src/test/resources/TestTransformXml/math.html</exclude> + <exclude>src/test/resources/TestTransformXml/tokens.csv</exclude> + <exclude>src/test/resources/TestTransformXml/tokens.xml</exclude> + <exclude>src/test/resources/TestUnpackContent/folder/cal.txt</exclude> + <exclude>src/test/resources/TestUnpackContent/folder/date.txt</exclude> + <exclude>src/test/resources/TestUnpackContent/data.flowfilev2</exclude> + <exclude>src/test/resources/TestUnpackContent/data.flowfilev3</exclude> + <exclude>src/test/resources/TestXml/xml-bundle-1</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt.bz2</exclude> + <exclude>src/test/resources/CompressedData/SampleFile.txt.gz</exclude> + <exclude>src/test/resources/CompressedData/SampleFile1.txt.bz2</exclude> + <exclude>src/test/resources/CompressedData/SampleFile1.txt.gz</exclude> + <exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude> + <exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude> + <exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude> + <exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt.bz2</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.txt.gz</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/1.zip</exclude> + <exclude>src/test/resources/TestIdentifyMimeType/flowfilev1.tar</exclude> + <exclude>src/test/resources/TestUnpackContent/data.tar</exclude> + <exclude>src/test/resources/TestUnpackContent/data.zip</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java new file mode 100644 index 0000000..9003b4a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." + + " Streaming is used so arbitrarily large result sets are supported.") +public class ExecuteSQL extends AbstractProcessor { + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); + private final Set<Relationship> relationships; + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("SQL select query") + .description("SQL select query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + private final List<PropertyDescriptor> propDescriptors; + + public ExecuteSQL() { + HashSet<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + ArrayList<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(SQL_SELECT_QUERY); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String selectQuery = context.getProperty(SQL_SELECT_QUERY).getValue(); + + try { + final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement(); + + final StopWatch stopWatch = new StopWatch(true); + + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + ResultSet resultSet = st.executeQuery(selectQuery); + long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out); + + + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + + logger.info("Transferred {} to 'success'", new Object[]{flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + } catch (FlowFileAccessException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java new file mode 100644 index 0000000..a361bd6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.FieldAssembler; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; + + +/** + * JDBC / SQL common functions. + * + */ +public class JdbcCommon { + + public static long convertToAvroStream(ResultSet rs, OutputStream outStream) throws SQLException, IOException { + + Schema schema = createSchema(rs); + GenericRecord rec = new GenericData.Record(schema); + +// ByteArrayOutputStream out = new ByteArrayOutputStream(); +// BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); + DataFileWriter<GenericRecord> dataFileWriter= new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(schema, outStream); + + ResultSetMetaData meta = rs.getMetaData(); + int nrOfColumns = meta.getColumnCount(); + long nrOfRows = 0; + while (rs.next()) { + for (int i = 1; i <= nrOfColumns; i++) { + Object value = rs.getObject(i); + rec.put(i-1, value); + } + dataFileWriter.append(rec); + nrOfRows += 1; + } + + dataFileWriter.close(); + return nrOfRows; +// encoder.flush(); +// out.close(); +// byte[] serializedBytes = out.toByteArray(); +// return serializedBytes; + } + + public static Schema createSchema(ResultSet rs) throws SQLException { + + ResultSetMetaData meta = rs.getMetaData(); + int nrOfColumns = meta.getColumnCount(); + String tableName = meta.getTableName(1); + + FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); + + /** + * Type conversion is not precise and is incomplete, needs to be fixed!!!!!! + */ + for (int i = 1; i <= nrOfColumns; i++) + switch (meta.getColumnType(i)) { + + case java.sql.Types.CHAR: + case java.sql.Types.LONGNVARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.NVARCHAR: + case java.sql.Types.VARCHAR: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + case java.sql.Types.BOOLEAN: + builder.name(meta.getColumnName(i)).type().booleanType().noDefault(); + break; + + case java.sql.Types.INTEGER: + case java.sql.Types.SMALLINT: + case java.sql.Types.TINYINT: + builder.name(meta.getColumnName(i)).type().intType().noDefault(); + break; + + case java.sql.Types.BIGINT: + builder.name(meta.getColumnName(i)).type().longType().noDefault(); + break; + + // java.sql.RowId is interface, is seems to be database implementation specific, let's convert to String + case java.sql.Types.ROWID: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + case java.sql.Types.FLOAT: + case java.sql.Types.REAL: + builder.name(meta.getColumnName(i)).type().floatType().noDefault(); + break; + + case java.sql.Types.DOUBLE: + builder.name(meta.getColumnName(i)).type().doubleType().noDefault(); + break; + + // TODO Did not find direct suitable type, need to be clarified!!!! + case java.sql.Types.DECIMAL: + case java.sql.Types.NUMERIC: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + // TODO Did not find direct suitable type, need to be clarified!!!! + case java.sql.Types.DATE: + case java.sql.Types.TIME: + case java.sql.Types.TIMESTAMP: + builder.name(meta.getColumnName(i)).type().stringType().noDefault(); + break; + + default: + break; + } + return builder.endRecord(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java new file mode 100644 index 0000000..15bc06c --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestExecuteSQL { + + private static Logger LOGGER; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQL", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQL", "debug"); + LOGGER = LoggerFactory.getLogger(TestExecuteSQL.class); + } + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Test + public void test1() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class); + + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + dbcpProperties.put("Database Host", "NA"); // Embedded Derby don't use host + dbcpProperties.put("Database Port", "1"); // Embedded Derby don't use port, but must have value anyway + dbcpProperties.put("Database Name", DB_LOCATION); + dbcpProperties.put("Database User", "tester"); + dbcpProperties.put("Password", "testerp"); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + + runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp"); + + String query = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"; + + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); + runner.enableControllerService(dbcp); + + runner.enqueue("Hello".getBytes()); + + runner.run(); + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + /** + * Simple implementation only for ExecuteSQL processor testing. + * + */ + class DBCPServiceSimpleImpl implements DBCPService { + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { } + + @Override + public Collection<ValidationResult> validate(ValidationContext context) { return null; } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { return null; } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { return null; } + + @Override + public String getIdentifier() { return null; } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } catch (Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java new file mode 100644 index 0000000..b7ab1d8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestJdbcCommon { + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))"; + String dropTable = "drop table restaurants"; + + @Test + public void testCreateSchema() throws ClassNotFoundException, SQLException { + + // remove previous test database, if any + File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + Connection con = createConnection(); + Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')"); + st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')"); + st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')"); + + ResultSet resultSet = st.executeQuery("select * from restaurants"); + + Schema schema = JdbcCommon.createSchema(resultSet); + assertNotNull(schema); + + // records name, should be result set first column table name + // Notice! sql select may join data from different tables, other columns may have different table names + assertEquals("RESTAURANTS", schema.getName()); + assertNotNull(schema.getField("ID")); + assertNotNull(schema.getField("NAME")); + assertNotNull(schema.getField("CITY")); + + st.close(); + con.close(); + } + + @Test + public void testConvertToBytes() throws ClassNotFoundException, SQLException, IOException { + // remove previous test database, if any + File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + Connection con = createConnection(); + Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + + st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')"); + st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')"); + st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')"); + + ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R"); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream); + + byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + st.close(); + con.close(); + + // Deserialize bytes to records + + InputStream instream = new ByteArrayInputStream(serializedBytes); + + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); + DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with many items. + record = dataFileReader.next(record); + System.out.println(record); + } + } + + // many test use Derby as database, so ensure driver is available + @Test + public void testDriverLoad() throws ClassNotFoundException { + Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + assertNotNull(clazz); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java new file mode 100644 index 0000000..8c54bc0 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test streaming using large number of result set rows. + * 1. Read data from database. + * 2. Create Avro schema from ResultSet meta data. + * 3. Read rows from ResultSet and write rows to Avro writer stream + * (Avro will create record for each row). + * 4. And finally read records from Avro stream to verify all data is present in Avro stream. + * + * + * Sql query will return all combinations from 3 table. + * For example when each table contain 1000 rows, result set will be 1 000 000 000 rows. + * + */ +public class TestJdbcHugeStream { + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + /** + * In case of large record set this will fail with + * java.lang.OutOfMemoryError: Java heap space + * at java.util.Arrays.copyOf(Arrays.java:2271) + * at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) + * at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) + * at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) + * at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446) + * + */ +// @Test + public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + Connection con = createConnection(); + loadTestData2Database(con, 150, 150, 150); + System.out.println("test data loaded"); + + Statement st = con.createStatement(); + + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not used. + ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + System.out.println("total nr of rows in resultset: " + nrOfRows); + + byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + // Deserialize bytes to records + + InputStream instream = new ByteArrayInputStream(serializedBytes); + + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); + DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader); + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with many items. + record = dataFileReader.next(record); +// System.out.println(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + st.close(); + con.close(); + } + + @Test + public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + Connection con = createConnection(); + loadTestData2Database(con, 300, 300, 300); + System.out.println("test data loaded"); + + Statement st = con.createStatement(); + + // Notice! + // Following select is deliberately invalid! + // For testing we need huge amount of rows, so where part is not used. + ResultSet resultSet = st.executeQuery("select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL"); + + OutputStream outStream = new FileOutputStream("target/data.avro"); + long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream); + System.out.println("total nr of rows in resultset: " + nrOfRows); +/* + byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); +*/ + // Deserialize bytes to records + + InputStream instream = new FileInputStream("target/data.avro"); + + DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(); + DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader); + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with many items. + record = dataFileReader.next(record); +// System.out.println(record); + recordsFromStream += 1; + } + System.out.println("total nr of records from stream: " + recordsFromStream); + assertEquals(nrOfRows, recordsFromStream); + st.close(); + con.close(); + } + + //================================================ helpers =============================================== + + static String dropPersons = "drop table persons"; + static String dropProducts = "drop table products"; + static String dropRelationships= "drop table relationships"; + static String createPersons = "create table persons (id integer, name varchar(100), code integer)"; + static String createProducts = "create table products (id integer, name varchar(100), code integer)"; + static String createRelationships = "create table relationships(id integer,name varchar(100), code integer)"; + + static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException { + + System.out.println(createRandomName()); + System.out.println(createRandomName()); + System.out.println(createRandomName()); + + Statement st = con.createStatement(); + + // tables may not exist, this is not serious problem. + try { st.executeUpdate(dropPersons); + } catch (Exception e) { } + + try { st.executeUpdate(dropProducts); + } catch (Exception e) { } + + try { st.executeUpdate(dropRelationships); + } catch (Exception e) { } + + st.executeUpdate(createPersons); + st.executeUpdate(createProducts); + st.executeUpdate(createRelationships); + + for (int i = 0; i < nrOfPersons; i++) + loadPersons(st, i); + + for (int i = 0; i < nrOfProducts; i++) + loadProducts(st, i); + + for (int i = 0; i < nrOfRels; i++) + loadRelationships(st, i); + + st.close(); + } + + static Random rng = new Random(53495); + + static private void loadPersons(Statement st, int nr) throws SQLException { + + st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + } + + static private void loadProducts(Statement st, int nr) throws SQLException { + + st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + } + + static private void loadRelationships(Statement st, int nr) throws SQLException { + + st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" ); + } + + static private String createRandomName() { + return createRandomString() + " " + createRandomString(); + } + + static private String createRandomString() { + + int length = rng.nextInt(19); + String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + char[] text = new char[length]; + for (int i = 0; i < length; i++) + { + text[i] = characters.charAt(rng.nextInt(characters.length())); + } + return new String(text); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/13addeb5/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 6cc7666..039057e 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -848,6 +848,11 @@ <version>0.2.0-incubating-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-dbcp-service-api</artifactId> + <version>0.2.0-incubating-SNAPSHOT</version> + </dependency> + <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.0.0</version>