NIFI-626 - First working version.

Signed-off-by: Toivo Adams <toivo.ad...@gmail.com>
Signed-off-by: Mark Payne <marka...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/14bcad21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/14bcad21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/14bcad21

Branch: refs/heads/develop
Commit: 14bcad212a4e2c7242e892c0389c8da885e87e9c
Parents: 13addeb
Author: Toivo Adams <toivo.ad...@gmail.com>
Authored: Tue May 26 14:36:59 2015 +0300
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jun 17 12:35:54 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 77 +++++++++--------
 .../processors/standard/util/JdbcCommon.java    | 13 +--
 .../org.apache.nifi.processor.Processor         |  1 +
 .../processors/standard/TestExecuteSQL.java     | 91 +++++++++++++-------
 4 files changed, 101 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/14bcad21/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 9003b4a..37fd0f7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -30,24 +29,20 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
 import org.apache.nifi.util.StopWatch;
@@ -92,6 +87,7 @@ public class ExecuteSQL extends AbstractProcessor {
         relationships = Collections.unmodifiableSet(r);
 
         ArrayList<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
         pds.add(SQL_SELECT_QUERY);
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -108,47 +104,54 @@ public class ExecuteSQL extends AbstractProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
+        FlowFile incoming = session.get();
+        if (incoming == null) {
             return;
         }
 
         final ProcessorLog logger = getLogger();
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).getValue();
+        final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
+               final StopWatch stopWatch = new StopWatch(true);
         
         try {
                        final Connection con = dbcpService.getConnection();
-                       final Statement st = con.createStatement();
-                       
-                       final StopWatch stopWatch = new StopWatch(true);
-                       
-                       flowFile = session.write(flowFile, new 
OutputStreamCallback() {
-                               @Override
-                               public void process(final OutputStream out) 
throws IOException {
-                                       try {
-                                               ResultSet resultSet = 
st.executeQuery(selectQuery);
-                                       long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, out);
-                                               
-                                               
-                                       } catch (SQLException e) {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
+                       try {
+                               final Statement st = con.createStatement();
+                               try {
+                                       FlowFile outgoing = 
session.write(incoming, new OutputStreamCallback() {
+                                               @Override
+                                               public void process(final 
OutputStream out) throws IOException {
+                                                       try {
+                                                               
logger.info("start executing query {}", new Object[]{selectQuery});
+                                                               ResultSet 
resultSet = st.executeQuery(selectQuery);
+                                                       Long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, out);
+                                                               
logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows});
+                                                               
+                                                       } catch (SQLException 
e) {
+                                                               throw new 
ProcessException(e);                                          
+                                                       }
+                                               }
+                                       });
+       
+                                       logger.info("Transferred {} to 
'success'", new Object[]{outgoing});
+                                       
session.getProvenanceReporter().modifyContent(outgoing, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                                       session.transfer(outgoing, REL_SUCCESS);
+                               } finally {
+                                       st.close();
                                }
-                       });
-
-                       logger.info("Transferred {} to 'success'", new 
Object[]{flowFile});
-                       session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-                       session.transfer(flowFile, REL_SUCCESS);
+                       } finally {
+                               // return connection to pool
+                               con.close();
+                       }
 
-               } catch (FlowFileAccessException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
+               } catch (ProcessException e) {
+                       logger.error("Unable to execute sql select query due to 
{}", new Object[]{e});
+                       session.transfer(incoming, REL_FAILURE);
                } catch (SQLException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
+                       logger.error("Unable to execute sql select query due to 
{}", new Object[]{e});
+                       session.transfer(incoming, REL_FAILURE);
                }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/14bcad21/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index a361bd6..c9ad423 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard.util;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.sql.ResultSet;
@@ -30,9 +29,7 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.EncoderFactory;
 
 
 /**
@@ -46,9 +43,6 @@ public class JdbcCommon {
                Schema schema = createSchema(rs);
                GenericRecord rec = new GenericData.Record(schema);
                
-//             ByteArrayOutputStream out = new ByteArrayOutputStream();
-//             BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, 
null);
-               
                DatumWriter<GenericRecord> datumWriter          = new 
GenericDatumWriter<GenericRecord>(schema);
                DataFileWriter<GenericRecord> dataFileWriter= new 
DataFileWriter<GenericRecord>(datumWriter); 
                dataFileWriter.create(schema, outStream);
@@ -67,10 +61,6 @@ public class JdbcCommon {
 
                dataFileWriter.close();
                return nrOfRows;
-//             encoder.flush();
-//             out.close();
-//             byte[] serializedBytes = out.toByteArray();
-//             return serializedBytes;
        }
        
        public static Schema createSchema(ResultSet rs) throws SQLException {
@@ -82,7 +72,8 @@ public class JdbcCommon {
                FieldAssembler<Schema> builder = 
SchemaBuilder.record(tableName).namespace("any.data").fields();
                
                /**
-                *      Type conversion is not precise and is incomplete, needs 
to be fixed!!!!!!
+                *      Some missing Avro types - Decimal, Date types.
+                * May need some additional work.
                 */
                for (int i = 1; i <= nrOfColumns; i++)
                        switch (meta.getColumnType(i)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/14bcad21/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 17339bc..e62b57f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -66,3 +66,4 @@ org.apache.nifi.processors.standard.SplitXml
 org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
+org.apache.nifi.processors.standard.ExecuteSQL

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/14bcad21/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 15bc06c..1b6de83 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -16,22 +16,32 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.util.Collection;
+import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -58,61 +68,76 @@ public class TestExecuteSQL {
     }
     
     @Test
-    public void test1() throws InitializationException {
+    public void test1() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
         
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
-        dbcpProperties.put("Database Host", "NA");    // Embedded Derby don't 
use host
-        dbcpProperties.put("Database Port", "1");  // Embedded Derby don't use 
port, but must have value anyway
-        dbcpProperties.put("Database Name", DB_LOCATION);
-        dbcpProperties.put("Database User",     "tester");
-        dbcpProperties.put("Password", "testerp");
 
         runner.addControllerService("dbcp", dbcp, dbcpProperties);
         
+        runner.enableControllerService(dbcp);
         runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
         
+        // remove previous test database, if any
+        File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+
+        // load test data to database
+        Connection con = dbcp.getConnection();
+        TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100);
+        System.out.println("test data loaded");
+        
+        // ResultSet size will be 1x100x100 = 10000 rows
+        // because of where PER.ID = ${person.id}
+        final int nrOfRows = 10000;
         String query = "select "
                        + "  PER.ID as PersonId, PER.NAME as PersonName, 
PER.CODE as PersonCode"
                        + ", PRD.ID as ProductId,PRD.NAME as 
ProductName,PRD.CODE as ProductCode"
                        + ", REL.ID as RelId,    REL.NAME as RelName,    
REL.CODE as RelCode"
                        + ", ROW_NUMBER() OVER () as rownr "
-                       + " from persons PER, products PRD, relationships REL";
+                       + " from persons PER, products PRD, relationships REL"
+                       + " where PER.ID = ${person.id}";
         
         runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
-        runner.enableControllerService(dbcp);
-        
-        runner.enqueue("Hello".getBytes());
+
+        // incoming FlowFile content is not used, but attributes are used
+        Map<String,String> attributes = new HashMap<String,String>();
+        attributes.put("person.id", "10");
+        runner.enqueue("Hello".getBytes(), attributes);
 
         runner.run();
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
-        runner.clearTransferState();
+
+        // read all Avro records and verify created FlowFile contains 1000000 
records
+        List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+        InputStream in = new 
ByteArrayInputStream(flowfiles.get(0).toByteArray());
+        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
+        DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(in, datumReader);
+        GenericRecord record = null;
+        long recordsFromStream = 0;
+        while (dataFileReader.hasNext()) {
+               // Reuse record object by passing it to next(). This saves us 
from
+               // allocating and garbage collecting many objects for files 
with many items.
+               record = dataFileReader.next(record);
+//             System.out.println(record);
+               recordsFromStream += 1;
+        }
+        System.out.println("total nr of records from stream: " + 
recordsFromStream);
+        assertEquals(nrOfRows, recordsFromStream);
+        dataFileReader.close();
     }
 
     /**
      * Simple implementation only for ExecuteSQL processor testing.
      *
      */
-    class DBCPServiceSimpleImpl implements DBCPService {
-
-               @Override
-               public void initialize(ControllerServiceInitializationContext 
context) throws InitializationException { }
-
-               @Override
-               public Collection<ValidationResult> validate(ValidationContext 
context) { return null; }
-
-               @Override
-               public PropertyDescriptor getPropertyDescriptor(String name) { 
return null; }
-
-               @Override
-               public void onPropertyModified(PropertyDescriptor descriptor, 
String oldValue, String newValue) { }
-
-               @Override
-               public List<PropertyDescriptor> getPropertyDescriptors() { 
return null; }
+    class DBCPServiceSimpleImpl extends AbstractControllerService implements 
DBCPService {
 
                @Override
-               public String getIdentifier() { return null; }
+               public String getIdentifier() {
+                       return "dbcp";
+               }
 
                @Override
                public Connection getConnection() throws ProcessException {

Reply via email to