NIFI-626: Code cleanup to adhere to NiFi coding styles

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a3b8e44a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a3b8e44a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a3b8e44a

Branch: refs/heads/develop
Commit: a3b8e44ad5aff7b68dde26c242ba171d1884b948
Parents: d98757e
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jun 10 12:16:52 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jun 17 12:37:25 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    |  71 +++--
 .../processors/standard/util/JdbcCommon.java    | 181 +++++++------
 .../processors/standard/TestExecuteSQL.java     |  57 ++--
 .../standard/util/TestJdbcHugeStream.java       | 259 +++++++++----------
 nifi/pom.xml                                    |   5 +
 5 files changed, 280 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a3b8e44a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 07e6fe9..6647c4c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.StopWatch;
 
 @EventDriven
@@ -62,7 +63,7 @@ public class ExecuteSQL extends AbstractProcessor {
                .description("SQL query execution failed. Incoming FlowFile 
will be penalized and routed to this relationship")
                .build();
     private final Set<Relationship> relationships;
-    
+
     public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
                .name("Database Connection Pooling Service")
                .description("The Controller Service that is used to obtain 
connection to database")
@@ -91,11 +92,11 @@ public class ExecuteSQL extends AbstractProcessor {
     private final List<PropertyDescriptor> propDescriptors;
 
     public ExecuteSQL() {
-        HashSet<Relationship> r = new HashSet<>();
+        final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
         relationships = Collections.unmodifiableSet(r);
 
-        ArrayList<PropertyDescriptor> pds = new ArrayList<>();
+        final List<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
         pds.add(SQL_SELECT_QUERY);
         pds.add(QUERY_TIMEOUT);
@@ -113,7 +114,7 @@ public class ExecuteSQL extends AbstractProcessor {
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         FlowFile incoming = session.get();
         if (incoming == null) {
             return;
@@ -126,44 +127,30 @@ public class ExecuteSQL extends AbstractProcessor {
         final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
 
                final StopWatch stopWatch = new StopWatch(true);
-        
-        try {
-                       final Connection con = dbcpService.getConnection();
-                       try {
-                               final Statement st = con.createStatement();
-                               try {
-                                       st.setQueryTimeout(queryTimeout);       
// timeout in seconds
-                                       FlowFile outgoing = 
session.write(incoming, new OutputStreamCallback() {
-                                               @Override
-                                               public void process(final 
OutputStream out) throws IOException {
-                                                       try {
-                                                               
logger.info("start executing query {}", new Object[]{selectQuery});
-                                                               ResultSet 
resultSet = st.executeQuery(selectQuery);
-                                                       Long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, out);
-                                                               
logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows});
-                                                               
-                                                       } catch (SQLException 
e) {
-                                                               throw new 
ProcessException(e);                                          
-                                                       }
-                                               }
-                                       });
-       
-                                       logger.info("Transferred {} to 
'success'", new Object[]{outgoing});
-                                       
session.getProvenanceReporter().modifyContent(outgoing, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-                                       session.transfer(outgoing, REL_SUCCESS);
-                               } finally {
-                                       st.close();
-                               }
-                       } finally {
-                               // return connection to pool
-                               con.close();
-                       }
-
-               } catch (ProcessException e) {
-                       logger.error("Unable to execute sql select query due to 
{}", new Object[]{e});
-                       session.transfer(incoming, REL_FAILURE);
-               } catch (SQLException e) {
-                       logger.error("Unable to execute sql select query due to 
{}", new Object[]{e});
+
+        try (final Connection con = dbcpService.getConnection();
+            final Statement st = con.createStatement()) {
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+            final LongHolder nrOfRows = new LongHolder(0L);
+            FlowFile outgoing = session.write(incoming, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    try {
+                        logger.debug("Executing query {}", new Object[] { 
selectQuery });
+                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
+                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out));
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
+                    }
+                }
+            });
+
+            logger.info("{} contains {} Avro records", new Object[] { 
nrOfRows.get() });
+            logger.info("Transferred {} to 'success'", new Object[] { outgoing 
});
+            session.getProvenanceReporter().modifyContent(outgoing, "Retrieved 
" + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(outgoing, REL_SUCCESS);
+        } catch (final ProcessException | SQLException e) {
+            logger.error("Unable to execute SQL select query {} for {} due to 
{}; routing to failure", new Object[] { selectQuery, incoming, e });
                        session.transfer(incoming, REL_FAILURE);
                }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a3b8e44a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index c9ad423..8dff244 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -21,6 +21,7 @@ import java.io.OutputStream;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import static java.sql.Types.*;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -34,104 +35,102 @@ import org.apache.avro.io.DatumWriter;
 
 /**
  *  JDBC / SQL common functions.
- *
  */
 public class JdbcCommon {
 
-       public static long convertToAvroStream(ResultSet rs, OutputStream 
outStream) throws SQLException, IOException {
-               
-               Schema schema = createSchema(rs);
-               GenericRecord rec = new GenericData.Record(schema);
-               
-               DatumWriter<GenericRecord> datumWriter          = new 
GenericDatumWriter<GenericRecord>(schema);
-               DataFileWriter<GenericRecord> dataFileWriter= new 
DataFileWriter<GenericRecord>(datumWriter); 
-               dataFileWriter.create(schema, outStream);
-               
-               ResultSetMetaData meta = rs.getMetaData();
-               int nrOfColumns = meta.getColumnCount();
-               long nrOfRows = 0;
-               while (rs.next()) {
-                       for (int i = 1; i <= nrOfColumns; i++) {
-                               Object value = rs.getObject(i);
-                               rec.put(i-1, value);
-                       }
-                       dataFileWriter.append(rec);
-                       nrOfRows += 1;
+    public static long convertToAvroStream(final ResultSet rs, final 
OutputStream outStream) throws SQLException, IOException {
+        final Schema schema = createSchema(rs);
+        final GenericRecord rec = new GenericData.Record(schema);
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<GenericRecord>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter)) {
+            dataFileWriter.create(schema, outStream);
+
+            final ResultSetMetaData meta = rs.getMetaData();
+            final int nrOfColumns = meta.getColumnCount();
+            long nrOfRows = 0;
+            while (rs.next()) {
+                for (int i = 1; i <= nrOfColumns; i++) {
+                    final Object value = rs.getObject(i);
+                    rec.put(i - 1, value);
+                }
+                dataFileWriter.append(rec);
+                nrOfRows += 1;
+            }
+
+            return nrOfRows;
                }
-
-               dataFileWriter.close();
-               return nrOfRows;
        }
-       
-       public static Schema createSchema(ResultSet rs) throws SQLException {
-               
-               ResultSetMetaData meta = rs.getMetaData();
-               int nrOfColumns = meta.getColumnCount();
-               String tableName = meta.getTableName(1);
-               
-               FieldAssembler<Schema> builder = 
SchemaBuilder.record(tableName).namespace("any.data").fields();
-               
-               /**
-                *      Some missing Avro types - Decimal, Date types.
-                * May need some additional work.
-                */
-               for (int i = 1; i <= nrOfColumns; i++)
+
+    public static Schema createSchema(final ResultSet rs) throws SQLException {
+        final ResultSetMetaData meta = rs.getMetaData();
+        final int nrOfColumns = meta.getColumnCount();
+        final String tableName = meta.getTableName(1);
+
+        final FieldAssembler<Schema> builder = 
SchemaBuilder.record(tableName).namespace("any.data").fields();
+
+        /**
+         * Some missing Avro types - Decimal, Date types. May need some
+         * additional work.
+         */
+        for (int i = 1; i <= nrOfColumns; i++) {
                        switch (meta.getColumnType(i)) {
-                       
-                       case java.sql.Types.CHAR:
-                       case java.sql.Types.LONGNVARCHAR:
-                       case java.sql.Types.LONGVARCHAR:
-                       case java.sql.Types.NCHAR:
-                       case java.sql.Types.NVARCHAR:
-                       case java.sql.Types.VARCHAR:
-                               
builder.name(meta.getColumnName(i)).type().stringType().noDefault();            
        
-                               break;
-
-                       case java.sql.Types.BOOLEAN:
-                               
builder.name(meta.getColumnName(i)).type().booleanType().noDefault();           
        
-                               break;
-
-                       case java.sql.Types.INTEGER:
-                       case java.sql.Types.SMALLINT:
-                       case java.sql.Types.TINYINT:
-                               
builder.name(meta.getColumnName(i)).type().intType().noDefault();               
        
-                               break;
-
-                       case java.sql.Types.BIGINT:
-                               
builder.name(meta.getColumnName(i)).type().longType().noDefault();              
        
-                               break;
-
-                       // java.sql.RowId is interface, is seems to be database 
implementation specific, let's convert to String
-                       case java.sql.Types.ROWID:
-                               
builder.name(meta.getColumnName(i)).type().stringType().noDefault();            
        
-                               break;
-
-                       case java.sql.Types.FLOAT:
-                       case java.sql.Types.REAL:
-                               
builder.name(meta.getColumnName(i)).type().floatType().noDefault();             
        
-                               break;
-
-                       case java.sql.Types.DOUBLE:
-                               
builder.name(meta.getColumnName(i)).type().doubleType().noDefault();            
        
-                               break;
-
-                       // TODO Did not find direct suitable type, need to be 
clarified!!!!
-                       case java.sql.Types.DECIMAL:
-                       case java.sql.Types.NUMERIC:
-                               
builder.name(meta.getColumnName(i)).type().stringType().noDefault();            
        
-                               break;
-
-                       // TODO Did not find direct suitable type, need to be 
clarified!!!!
-                       case java.sql.Types.DATE:
-                       case java.sql.Types.TIME:
-                       case java.sql.Types.TIMESTAMP:
-                               
builder.name(meta.getColumnName(i)).type().stringType().noDefault();            
        
-                               break;
-
-                       default:
-                               break;
+                case CHAR:
+                case LONGNVARCHAR:
+                case LONGVARCHAR:
+                case NCHAR:
+                case NVARCHAR:
+                case VARCHAR:
+                    
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+                    break;
+
+                case BOOLEAN:
+                    
builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
+                    break;
+
+                case INTEGER:
+                case SMALLINT:
+                case TINYINT:
+                    
builder.name(meta.getColumnName(i)).type().intType().noDefault();
+                    break;
+
+                case BIGINT:
+                    
builder.name(meta.getColumnName(i)).type().longType().noDefault();
+                    break;
+
+                // java.sql.RowId is interface, is seems to be database
+                // implementation specific, let's convert to String
+                case ROWID:
+                    
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+                    break;
+
+                case FLOAT:
+                case REAL:
+                    
builder.name(meta.getColumnName(i)).type().floatType().noDefault();
+                    break;
+
+                case DOUBLE:
+                    
builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
+                    break;
+
+                // Did not find direct suitable type, need to be clarified!!!!
+                case DECIMAL:
+                case NUMERIC:
+                    
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+                    break;
+
+                // Did not find direct suitable type, need to be clarified!!!!
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
+                    break;
+
+                default:
+                    break;
                        }
+        }
+
                return builder.endRecord();
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a3b8e44a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index b9243ef..30da519 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -65,7 +65,7 @@ public class TestExecuteSQL {
     public static void setup() {
         System.setProperty("derby.stream.error.file", "target/derby.log");
     }
-    
+
     @Test
     public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
        invokeOnTrigger(null);
@@ -77,44 +77,45 @@ public class TestExecuteSQL {
        invokeOnTrigger(1);             // 1 second max time
     }
 
-    public void invokeOnTrigger(Integer queryTimeout) throws 
InitializationException, ClassNotFoundException, SQLException, IOException {
+    public void invokeOnTrigger(final Integer queryTimeout) throws 
InitializationException, ClassNotFoundException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
-        
+
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
 
         runner.addControllerService("dbcp", dbcp, dbcpProperties);
-        
+
         runner.enableControllerService(dbcp);
         runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
-        
-        if (queryTimeout!=null)
+
+        if (queryTimeout != null) {
                runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, 
queryTimeout.toString() + " secs");
-        
+        }
+
         // remove previous test database, if any
-        File dbLocation = new File(DB_LOCATION);
+        final File dbLocation = new File(DB_LOCATION);
         dbLocation.delete();
 
         // load test data to database
-        Connection con = dbcp.getConnection();
+        final Connection con = dbcp.getConnection();
         TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
-        System.out.println("test data loaded");
-        
+        LOGGER.info("test data loaded");
+
         // ResultSet size will be 1x2000x1000 = 2 000 000 rows
         // because of where PER.ID = ${person.id}
         final int nrOfRows = 2000000;
-        String query = "select "
+        final String query = "select "
                        + "  PER.ID as PersonId, PER.NAME as PersonName, 
PER.CODE as PersonCode"
                        + ", PRD.ID as ProductId,PRD.NAME as 
ProductName,PRD.CODE as ProductCode"
                        + ", REL.ID as RelId,    REL.NAME as RelName,    
REL.CODE as RelCode"
                        + ", ROW_NUMBER() OVER () as rownr "
                        + " from persons PER, products PRD, relationships REL"
                        + " where PER.ID = ${person.id}";
-        
+
         runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
 
         // incoming FlowFile content is not used, but attributes are used
-        Map<String,String> attributes = new HashMap<String,String>();
+        final Map<String,String> attributes = new HashMap<String,String>();
         attributes.put("person.id", "10");
         runner.enqueue("Hello".getBytes(), attributes);
 
@@ -122,27 +123,27 @@ public class TestExecuteSQL {
         runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
 
         // read all Avro records and verify created FlowFile contains 1000000 
records
-        List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
-        InputStream in = new 
ByteArrayInputStream(flowfiles.get(0).toByteArray());
-        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
-        DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(in, datumReader);
+        final List<MockFlowFile> flowfiles = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
+        final InputStream in = new 
ByteArrayInputStream(flowfiles.get(0).toByteArray());
+        final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
+        final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(in, datumReader);
         GenericRecord record = null;
         long recordsFromStream = 0;
         while (dataFileReader.hasNext()) {
                // Reuse record object by passing it to next(). This saves us 
from
                // allocating and garbage collecting many objects for files 
with many items.
                record = dataFileReader.next(record);
-//             System.out.println(record);
                recordsFromStream += 1;
         }
-        System.out.println("total nr of records from stream: " + 
recordsFromStream);
+
+        LOGGER.info("total nr of records from stream: " + recordsFromStream);
         assertEquals(nrOfRows, recordsFromStream);
         dataFileReader.close();
     }
 
-    
-    
-    
+
+
+
     /**
      * Simple implementation only for ExecuteSQL processor testing.
      *
@@ -157,13 +158,13 @@ public class TestExecuteSQL {
                @Override
                public Connection getConnection() throws ProcessException {
                try {
-                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");        
-                               Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+                               
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                               final Connection con = 
DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
                                return con;
-                       } catch (Exception e) {
+                       } catch (final Exception e) {
                                throw new ProcessException("getConnection 
failed: " + e);
                        }
-               }       
+               }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a3b8e44a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
index 8c54bc0..89c2fb3 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java
@@ -39,17 +39,18 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  *  Test streaming using large number of result set rows.
  * 1. Read data from database.
  * 2. Create Avro schema from ResultSet meta data.
- * 3. Read rows from ResultSet and write rows to Avro writer stream 
+ * 3. Read rows from ResultSet and write rows to Avro writer stream
  *    (Avro will create record for each row).
- * 4. And finally read records from Avro stream to verify all data is present 
in Avro stream. 
- *   
- *  
+ * 4. And finally read records from Avro stream to verify all data is present 
in Avro stream.
+ *
+ *
  * Sql query will return all combinations from 3 table.
  * For example when each table contain 1000 rows, result set will be 1 000 000 
000 rows.
  *
@@ -63,7 +64,7 @@ public class TestJdbcHugeStream {
         System.setProperty("derby.stream.error.file", "target/derby.log");
     }
 
-    /**        
+    /**
      *         In case of large record set this will fail with
      * java.lang.OutOfMemoryError: Java heap space
         * at java.util.Arrays.copyOf(Arrays.java:2271)
@@ -71,149 +72,147 @@ public class TestJdbcHugeStream {
         * at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
         * at 
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
         * at 
org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
-     * 
-     */    
-//     @Test
+     *
+     */
+    @Test
+    @Ignore
        public void readSend2StreamHuge_InMemory() throws 
ClassNotFoundException, SQLException, IOException {
-               
+
         // remove previous test database, if any
-        File dbLocation = new File(DB_LOCATION);
+        final File dbLocation = new File(DB_LOCATION);
         dbLocation.delete();
 
-        Connection con = createConnection();
-        loadTestData2Database(con, 150, 150, 150);
-        System.out.println("test data loaded");
-        
-        Statement st = con.createStatement();
-        
-        // Notice!
-        // Following select is deliberately invalid!
-        // For testing we need huge amount of rows, so where part is not used.
-        ResultSet resultSet = st.executeQuery("select "
-                       + "  PER.ID as PersonId, PER.NAME as PersonName, 
PER.CODE as PersonCode"
-                       + ", PRD.ID as ProductId,PRD.NAME as 
ProductName,PRD.CODE as ProductCode"
-                       + ", REL.ID as RelId,    REL.NAME as RelName,    
REL.CODE as RelCode"
-                       + ", ROW_NUMBER() OVER () as rownr "
-                       + " from persons PER, products PRD, relationships REL");
-
-        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-        long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
-        System.out.println("total nr of rows in resultset: " + nrOfRows);
-
-        byte[] serializedBytes = outStream.toByteArray();
-        assertNotNull(serializedBytes);
-        System.out.println("Avro serialized result size in bytes: " + 
serializedBytes.length);
-
-        // Deserialize bytes to records
-        
-        InputStream instream = new ByteArrayInputStream(serializedBytes);
-        
-        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
-        DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(instream, datumReader);
-        GenericRecord record = null;
-        long recordsFromStream = 0;
-        while (dataFileReader.hasNext()) {
-               // Reuse record object by passing it to next(). This saves us 
from
-               // allocating and garbage collecting many objects for files 
with many items.
-               record = dataFileReader.next(record);
-//             System.out.println(record);
-               recordsFromStream += 1;
+        try (final Connection con = createConnection()) {
+            loadTestData2Database(con, 150, 150, 150);
+            System.out.println("test data loaded");
+
+            try (final Statement st = con.createStatement()) {
+                // Notice!
+                // Following select is deliberately invalid!
+                // For testing we need huge amount of rows, so where part is 
not
+                // used.
+                final ResultSet resultSet = st.executeQuery("select "
+                    + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE 
as PersonCode"
+                    + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE 
as ProductCode"
+                    + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE 
as RelCode"
+                    + ", ROW_NUMBER() OVER () as rownr "
+                    + " from persons PER, products PRD, relationships REL");
+
+                final ByteArrayOutputStream outStream = new 
ByteArrayOutputStream();
+                final long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, outStream);
+                System.out.println("total nr of rows in resultset: " + 
nrOfRows);
+
+                final byte[] serializedBytes = outStream.toByteArray();
+                assertNotNull(serializedBytes);
+                System.out.println("Avro serialized result size in bytes: " + 
serializedBytes.length);
+
+                // Deserialize bytes to records
+
+                final InputStream instream = new 
ByteArrayInputStream(serializedBytes);
+
+                final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
+                try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(instream, datumReader)) {
+                    GenericRecord record = null;
+                    long recordsFromStream = 0;
+                    while (dataFileReader.hasNext()) {
+                        // Reuse record object by passing it to next(). This
+                        // saves us from
+                        // allocating and garbage collecting many objects for
+                        // files with many items.
+                        record = dataFileReader.next(record);
+                        recordsFromStream += 1;
+                    }
+                    System.out.println("total nr of records from stream: " + 
recordsFromStream);
+                    assertEquals(nrOfRows, recordsFromStream);
+                }
+            }
         }
-        System.out.println("total nr of records from stream: " + 
recordsFromStream);
-        assertEquals(nrOfRows, recordsFromStream);
-        st.close();
-        con.close();
        }
-               
+
        @Test
        public void readSend2StreamHuge_FileBased() throws 
ClassNotFoundException, SQLException, IOException {
-               
+
         // remove previous test database, if any
-        File dbLocation = new File(DB_LOCATION);
+        final File dbLocation = new File(DB_LOCATION);
         dbLocation.delete();
 
-        Connection con = createConnection();
+        try (final Connection con = createConnection()) {
         loadTestData2Database(con, 300, 300, 300);
-        System.out.println("test data loaded");
-        
-        Statement st = con.createStatement();
-        
-        // Notice!
-        // Following select is deliberately invalid!
-        // For testing we need huge amount of rows, so where part is not used.
-        ResultSet resultSet = st.executeQuery("select "
-                       + "  PER.ID as PersonId, PER.NAME as PersonName, 
PER.CODE as PersonCode"
-                       + ", PRD.ID as ProductId,PRD.NAME as 
ProductName,PRD.CODE as ProductCode"
-                       + ", REL.ID as RelId,    REL.NAME as RelName,    
REL.CODE as RelCode"
-                       + ", ROW_NUMBER() OVER () as rownr "
-                       + " from persons PER, products PRD, relationships REL");
-
-        OutputStream outStream = new FileOutputStream("target/data.avro");
-        long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
-        System.out.println("total nr of rows in resultset: " + nrOfRows);
-/*
-        byte[] serializedBytes = outStream.toByteArray();
-        assertNotNull(serializedBytes);
-        System.out.println("Avro serialized result size in bytes: " + 
serializedBytes.length);
-*/
-        // Deserialize bytes to records
-        
-        InputStream instream = new FileInputStream("target/data.avro");
-        
-        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
-        DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(instream, datumReader);
-        GenericRecord record = null;
-        long recordsFromStream = 0;
-        while (dataFileReader.hasNext()) {
-               // Reuse record object by passing it to next(). This saves us 
from
-               // allocating and garbage collecting many objects for files 
with many items.
-               record = dataFileReader.next(record);
-//             System.out.println(record);
-               recordsFromStream += 1;
+
+            try (final Statement st = con.createStatement()) {
+                // Notice!
+                // Following select is deliberately invalid!
+                // For testing we need huge amount of rows, so where part is 
not
+                // used.
+                final ResultSet resultSet = st.executeQuery("select "
+                    + "  PER.ID as PersonId, PER.NAME as PersonName, PER.CODE 
as PersonCode"
+                    + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE 
as ProductCode"
+                    + ", REL.ID as RelId,    REL.NAME as RelName,    REL.CODE 
as RelCode"
+                    + ", ROW_NUMBER() OVER () as rownr "
+                    + " from persons PER, products PRD, relationships REL");
+
+                final OutputStream outStream = new 
FileOutputStream("target/data.avro");
+                final long nrOfRows = 
JdbcCommon.convertToAvroStream(resultSet, outStream);
+
+                // Deserialize bytes to records
+                final InputStream instream = new 
FileInputStream("target/data.avro");
+
+                final DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
+                try (final DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(instream, datumReader)) {
+                    GenericRecord record = null;
+                    long recordsFromStream = 0;
+                    while (dataFileReader.hasNext()) {
+                        // Reuse record object by passing it to next(). This
+                        // saves us from
+                        // allocating and garbage collecting many objects for
+                        // files with many items.
+                        record = dataFileReader.next(record);
+                        recordsFromStream += 1;
+                    }
+                    System.out.println("total nr of records from stream: " + 
recordsFromStream);
+                    assertEquals(nrOfRows, recordsFromStream);
+                }
+            }
         }
-        System.out.println("total nr of records from stream: " + 
recordsFromStream);
-        assertEquals(nrOfRows, recordsFromStream);        
-        st.close();
-        con.close();
        }
-               
+
        //================================================  helpers  
===============================================
-       
-    static String dropPersons          = "drop table persons";
-    static String dropProducts         = "drop table products";
-    static String dropRelationships= "drop table relationships";
-    static String createPersons                = "create table persons         
(id integer, name varchar(100), code integer)";
-    static String createProducts               = "create table products        
(id integer, name varchar(100), code integer)";
-    static String createRelationships  = "create table relationships(id 
integer,name varchar(100), code integer)";
+
+    static String dropPersons = "drop table persons";
+    static String dropProducts = "drop table products";
+    static String dropRelationships = "drop table relationships";
+    static String createPersons = "create table persons (id integer, name 
varchar(100), code integer)";
+    static String createProducts = "create table products (id integer, name 
varchar(100), code integer)";
+    static String createRelationships = "create table relationships (id 
integer,name varchar(100), code integer)";
 
        static public void loadTestData2Database(Connection con, int 
nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, 
SQLException {
-               
+
                System.out.println(createRandomName());
                System.out.println(createRandomName());
                System.out.println(createRandomName());
-               
-        Statement st = con.createStatement();
+
+        final Statement st = con.createStatement();
 
         // tables may not exist, this is not serious problem.
         try { st.executeUpdate(dropPersons);
-        } catch (Exception e) { }
-        
+        } catch (final Exception e) { }
+
         try { st.executeUpdate(dropProducts);
-        } catch (Exception e) { }
-        
+        } catch (final Exception e) { }
+
         try { st.executeUpdate(dropRelationships);
-        } catch (Exception e) { } 
+        } catch (final Exception e) { }
 
         st.executeUpdate(createPersons);
         st.executeUpdate(createProducts);
         st.executeUpdate(createRelationships);
-        
+
         for (int i = 0; i < nrOfPersons; i++)
                loadPersons(st, i);
-               
+
         for (int i = 0; i < nrOfProducts; i++)
                loadProducts(st, i);
-               
+
         for (int i = 0; i < nrOfRels; i++)
                loadRelationships(st, i);
 
@@ -223,41 +222,37 @@ public class TestJdbcHugeStream {
        static Random rng = new Random(53495);
 
        static private void loadPersons(Statement st, int nr) throws 
SQLException {
-               
-        st.executeUpdate("insert into persons values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );              
+        st.executeUpdate("insert into persons values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );
        }
 
        static private void loadProducts(Statement st, int nr) throws 
SQLException {
-               
-        st.executeUpdate("insert into products values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );             
+        st.executeUpdate("insert into products values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );
        }
 
        static private void loadRelationships(Statement st, int nr) throws 
SQLException {
-               
-        st.executeUpdate("insert into relationships values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );                
+        st.executeUpdate("insert into relationships values (" + nr + ", '" + 
createRandomName() +  "', " + rng.nextInt(469946) + ")" );
        }
 
        static private String createRandomName() {
                return createRandomString() + " " + createRandomString();
        }
-       
+
        static private String createRandomString() {
-               
-               int length = rng.nextInt(19);
-               String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
-               
-               char[] text = new char[length];
+
+               final int length = rng.nextInt(19);
+               final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+               final char[] text = new char[length];
            for (int i = 0; i < length; i++)
            {
                text[i] = characters.charAt(rng.nextInt(characters.length()));
            }
-           return new String(text);                    
+           return new String(text);
        }
-       
+
        private Connection createConnection() throws ClassNotFoundException, 
SQLException {
-               
-        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");        
-        Connection con = DriverManager.getConnection("jdbc:derby:" + 
DB_LOCATION + ";create=true");
+        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+        final Connection con = DriverManager.getConnection("jdbc:derby:" + 
DB_LOCATION + ";create=true");
                return con;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a3b8e44a/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 039057e..0c71ba8 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -857,6 +857,11 @@
                 <artifactId>json-path</artifactId>
                 <version>2.0.0</version>
             </dependency>
+            <dependency>
+                           <groupId>org.apache.derby</groupId>
+                           <artifactId>derby</artifactId>
+                           <version>10.11.1.1</version>
+                   </dependency>
         </dependencies>
     </dependencyManagement>
     <dependencies>


Reply via email to