NIFI-626 - query timeout added

Signed-off-by: Toivo Adams <toivo.ad...@gmail.com>
Signed-off-by: Mark Payne <marka...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d98757e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d98757e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d98757e4

Branch: refs/heads/develop
Commit: d98757e4c79a0c036165e6278fc99909fc2ebcaf
Parents: 14bcad2
Author: Toivo Adams <toivo.ad...@gmail.com>
Authored: Sun May 31 16:43:49 2015 +0300
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jun 17 12:35:55 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 15 +++++++++++-
 .../processors/standard/TestExecuteSQL.java     | 25 ++++++++++++++++----
 2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d98757e4/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 37fd0f7..07e6fe9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -40,7 +40,6 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -79,6 +78,16 @@ public class ExecuteSQL extends AbstractProcessor {
                .expressionLanguageSupported(true)
                .build();
 
+    public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
+    .name("Max Wait Time")
+    .description("The maximum amount of time allowed for a running SQL select 
query "
+        + " , zero means there is no limit. Max time less than 1 second will 
be equal to zero.")
+        .defaultValue("0 seconds")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .sensitive(false)
+        .build();
+
     private final List<PropertyDescriptor> propDescriptors;
 
     public ExecuteSQL() {
@@ -89,6 +98,7 @@ public class ExecuteSQL extends AbstractProcessor {
         ArrayList<PropertyDescriptor> pds = new ArrayList<>();
         pds.add(DBCP_SERVICE);
         pds.add(SQL_SELECT_QUERY);
+        pds.add(QUERY_TIMEOUT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -113,6 +123,8 @@ public class ExecuteSQL extends AbstractProcessor {
 
         final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
         final String selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
+        final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+
                final StopWatch stopWatch = new StopWatch(true);
         
         try {
@@ -120,6 +132,7 @@ public class ExecuteSQL extends AbstractProcessor {
                        try {
                                final Statement st = con.createStatement();
                                try {
+                                       st.setQueryTimeout(queryTimeout);       
// timeout in seconds
                                        FlowFile outgoing = 
session.write(incoming, new OutputStreamCallback() {
                                                @Override
                                                public void process(final 
OutputStream out) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d98757e4/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 1b6de83..b9243ef 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -34,7 +34,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
 import org.apache.nifi.reporting.InitializationException;
@@ -68,7 +67,17 @@ public class TestExecuteSQL {
     }
     
     @Test
-    public void test1() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+    public void testNoTimeLimit() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+       invokeOnTrigger(null);
+    }
+
+    @Test
+    public void testQueryTimeout() throws InitializationException, 
ClassNotFoundException, SQLException, IOException {
+       // Does to seem to have any effect when using embedded Derby
+       invokeOnTrigger(1);             // 1 second max time
+    }
+
+    public void invokeOnTrigger(Integer queryTimeout) throws 
InitializationException, ClassNotFoundException, SQLException, IOException {
         final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
         
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
@@ -79,18 +88,21 @@ public class TestExecuteSQL {
         runner.enableControllerService(dbcp);
         runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
         
+        if (queryTimeout!=null)
+               runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, 
queryTimeout.toString() + " secs");
+        
         // remove previous test database, if any
         File dbLocation = new File(DB_LOCATION);
         dbLocation.delete();
 
         // load test data to database
         Connection con = dbcp.getConnection();
-        TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100);
+        TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
         System.out.println("test data loaded");
         
-        // ResultSet size will be 1x100x100 = 10000 rows
+        // ResultSet size will be 1x2000x1000 = 2 000 000 rows
         // because of where PER.ID = ${person.id}
-        final int nrOfRows = 10000;
+        final int nrOfRows = 2000000;
         String query = "select "
                        + "  PER.ID as PersonId, PER.NAME as PersonName, 
PER.CODE as PersonCode"
                        + ", PRD.ID as ProductId,PRD.NAME as 
ProductName,PRD.CODE as ProductCode"
@@ -128,6 +140,9 @@ public class TestExecuteSQL {
         dataFileReader.close();
     }
 
+    
+    
+    
     /**
      * Simple implementation only for ExecuteSQL processor testing.
      *

Reply via email to