DRILL-2884: 2-Hygiene: Associated cleanup, mostly of DrillResultSetImpl.

- Re-ordered UserResultsListener methods logically (chronologically).
  (In UserResultsListener too.)
- Re-ordered some other ResultsListener methods more logically 
(chronologically).
- Added ResultsListener logging (incoming and outgoing).
- Moved queryId down into ResultListener so ResultListener doesn't need to
  reach up into DrillResultSetImpl; changed from inner to nested class.
- Renamed:
  - Fixed typo "resultslistener" -> "resultsListener"
  - MAX -> THROTTLING_QUERY_SIZE_THRESHOLD, queue -> batchQueue,
    ex -> executionFailureException
- Added, purged some TODOs.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7b776e7f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7b776e7f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7b776e7f

Branch: refs/heads/master
Commit: 7b776e7fb94b1b2b57fe3bc0b586047c47a5f042
Parents: f9efc3b
Author: dbarclay <dbarc...@maprtech.com>
Authored: Sat Apr 25 21:23:10 2015 -0700
Committer: Parth Chandra <pchan...@maprtech.com>
Committed: Tue May 5 17:40:13 2015 -0700

----------------------------------------------------------------------
 .../exec/rpc/user/UserResultsListener.java      |  14 +--
 .../java/org/apache/drill/jdbc/DrillCursor.java |   2 +-
 .../drill/jdbc/impl/DrillResultSetImpl.java     | 112 ++++++++++++-------
 3 files changed, 77 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index e422a3f..01a44b8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -37,17 +37,17 @@ public interface UserResultsListener {
   void submissionFailed(UserException ex);
 
   /**
-   * The query has completed (successsful completion or cancellation). The 
listener will not receive any other
-   * data or result message. Called when the server returns a terminal-non 
failing- state (COMPLETED or CANCELLED)
-   * @param state
-   */
-  void queryCompleted(QueryState state);
-
-  /**
    * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message 
was received
    * @param result data batch received
    * @param throttle connection throttle
    */
   void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
 
+  /**
+   * The query has completed (successsful completion or cancellation). The 
listener will not receive any other
+   * data or result message. Called when the server returns a terminal-non 
failing- state (COMPLETED or CANCELLED)
+   * @param state
+   */
+  void queryCompleted(QueryState state);
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index 41b89ce..6bad3ce 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -65,7 +65,7 @@ public class DrillCursor implements Cursor {
   public DrillCursor(final DrillResultSetImpl resultSet) {
     this.resultSet = resultSet;
     currentBatch = resultSet.currentBatch;
-    resultsListener = resultSet.resultslistener;
+    resultsListener = resultSet.resultsListener;
   }
 
   public DrillResultSetImpl getResultSet() {

http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java 
b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
index 35674bf..a7cc0c1 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java
@@ -47,31 +47,31 @@ import org.apache.drill.jdbc.DrillResultSet;
 import org.apache.drill.jdbc.ExecutionCanceledSqlException;
 import org.apache.drill.jdbc.SchemaChangeListener;
 
-import com.google.common.collect.Queues;
+import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
 
+import com.google.common.collect.Queues;
 
-//???? Split this into interface org.apache.drill.jdbc.DrillResultSet for 
published
-// interface and a class probably named 
org.apache.drill.jdbc.impl.DrillResultSetImpl.
-// Add any needed documentation of Drill-specific behavior of JDBC-defined
-// ResultSet methods to new DrillResultSet. ...
 
 public class DrillResultSetImpl extends AvaticaResultSet implements 
DrillResultSet {
+  @SuppressWarnings("unused")
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class);
 
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
   public SchemaChangeListener changeListener;
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
-  public final ResultsListener resultslistener = new ResultsListener();
-  private volatile QueryId queryId;
+  public final ResultsListener resultsListener = new ResultsListener();
   private final DrillClient client;
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
+  // TODO:  Resolve:  Since is barely manipulated here in DrillResultSetImpl,
+  //  move down into DrillCursor and have this.clean() have cursor clean it.
   public final RecordBatchLoader currentBatch;
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
   public final DrillCursor cursor;
   public boolean hasPendingCancelationNotification;
 
   public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult 
prepareResult,
-      ResultSetMetaData resultSetMetaData, TimeZone timeZone) {
+                            ResultSetMetaData resultSetMetaData, TimeZone 
timeZone) {
     super(statement, prepareResult, resultSetMetaData, timeZone);
     DrillConnection c = (DrillConnection) statement.getConnection();
     DrillClient client = c.getClient();
@@ -120,10 +120,10 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
 
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
   public synchronized void cleanup() {
-    if (queryId != null && ! resultslistener.completed) {
-      client.cancelQuery(queryId);
+    if (resultsListener.getQueryId() != null && ! resultsListener.completed) {
+      client.cancelQuery(resultsListener.getQueryId());
     }
-    resultslistener.close();
+    resultsListener.close();
     currentBatch.clear();
   }
 
@@ -145,45 +145,60 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
     DrillConnectionImpl connection = (DrillConnectionImpl) 
statement.getConnection();
 
     connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(),
-                                    resultslistener);
+                                    resultsListener);
     connection.getDriver().handler.onStatementExecute(statement, null);
 
     super.execute();
 
     // don't return with metadata until we've achieved at least one return 
message.
     try {
-      resultslistener.latch.await();
+      // TODO:  Revisit:  Why reaching directly into ResultsListener rather 
than
+      // calling some wait method?
+      resultsListener.latch.await();
       cursor.next();
     } catch (InterruptedException e) {
-     // TODO:  Check:  Should this call Thread.currentThread.interrupt()?   If
-     // not, at least document why this is empty.
+      // TODO:  Check:  Should this call Thread.currentThread.interrupt()?   If
+      // not, at least document why this is empty.
+      // TODO:  Check:  Does anything ever interrupt this?  (Is catch needed?)
     }
 
     return this;
   }
 
   public String getQueryId() {
-    if (queryId != null) {
-      return QueryIdHelper.getQueryId(queryId);
+    if (resultsListener.queryId != null) {
+      return QueryIdHelper.getQueryId(resultsListener.queryId);
     } else {
       return null;
     }
   }
 
   // (Public until JDBC impl. classes moved out of published-intf. package. 
(DRILL-2089).)
-  public class ResultsListener implements UserResultsListener {
-    private static final int MAX = 100;
-    private volatile UserException ex;
+  public static class ResultsListener implements UserResultsListener {
+    private static Logger logger = getLogger( ResultsListener.class );
+
+    private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100;
+
+    private volatile QueryId queryId;
+
+
+    private volatile UserException executionFailureException;
     volatile boolean completed = false;
     private volatile boolean autoread = true;
     private volatile ConnectionThrottle throttle;
     private volatile boolean closed = false;
+    // TODO:  Rename.  It's obvious it's a latch--but what condition or action
+    // does it represent or control?
     private CountDownLatch latch = new CountDownLatch(1);
     private AtomicBoolean receivedMessage = new AtomicBoolean(false);
 
+    final LinkedBlockingDeque<QueryDataBatch> batchQueue =
+        Queues.newLinkedBlockingDeque();
 
 
-    final LinkedBlockingDeque<QueryDataBatch> queue = 
Queues.newLinkedBlockingDeque();
+    ResultsListener() {
+      logger.debug( "Query listener created." );
+    }
 
     // TODO:  Doc.:  Release what if what is first relative to what?
     private boolean releaseIfFirst() {
@@ -196,22 +211,23 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
     }
 
     @Override
-    public void submissionFailed(UserException ex) {
-      this.ex = ex;
-      completed = true;
-      close();
-      System.out.println("Query failed: " + ex.getMessage());
+    public void queryIdArrived(QueryId queryId) {
+      logger.debug( "Received query ID: {}.", queryId );
+      this.queryId = queryId;
     }
 
     @Override
-    public void queryCompleted(QueryState state) {
-      releaseIfFirst();
+    public void submissionFailed(UserException ex) {
+      logger.debug( "Received query failure.", ex );
+      this.executionFailureException = ex;
       completed = true;
+      close();
+      logger.info( "Query failed: ", ex );
     }
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle 
throttle) {
-      logger.debug("Result arrived {}", result);
+      logger.debug( "Received query data: {}.", result );
 
       // If we're in a closed state, just release the message.
       if (closed) {
@@ -221,8 +237,8 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
       }
 
       // We're active; let's add to the queue.
-      queue.add(result);
-      if (queue.size() >= MAX - 1) {
+      batchQueue.add(result);
+      if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) {
         throttle.setAutoRead(false);
         this.throttle = throttle;
         autoread = false;
@@ -231,22 +247,36 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
       releaseIfFirst();
     }
 
+    @Override
+    public void queryCompleted(QueryState state) {
+      logger.debug( "Query completion arrived: {}.", state );
+      releaseIfFirst();
+      completed = true;
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+
     // TODO:  Doc.:  Specify whether result can be null and what that means.
     public QueryDataBatch getNext() throws Exception {
       while (true) {
-        if (ex != null) {
-          throw ex;
+        if (executionFailureException != null) {
+          logger.debug( "Dequeued query failure exception: {}.", 
executionFailureException );
+          throw executionFailureException;
         }
-        if (completed && queue.isEmpty()) {
+        if (completed && batchQueue.isEmpty()) {
           return null;
         } else {
-          QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+          QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS);
           if (q != null) {
-            if (!autoread && queue.size() < MAX / 2) {
+            if (!autoread && batchQueue.size() < 
THROTTLING_QUEUE_SIZE_THRESHOLD / 2) {
               autoread = true;
               throttle.setAutoRead(true);
               throttle = null;
             }
+            logger.debug( "Dequeued query data: {}.", q );
             return q;
           }
         }
@@ -255,22 +285,18 @@ public class DrillResultSetImpl extends AvaticaResultSet 
implements DrillResultS
 
     void close() {
       closed = true;
-      while (!queue.isEmpty()) {
-        QueryDataBatch qrb = queue.poll();
+      while (!batchQueue.isEmpty()) {
+        QueryDataBatch qrb = batchQueue.poll();
         if (qrb != null && qrb.getData() != null) {
           qrb.getData().release();
         }
       }
       // close may be called before the first result is received and the main 
thread is blocked waiting
       // for the result. In that case we want to unblock the main thread.
-      latch.countDown();
+      latch.countDown(); // TODO:  Why not call releaseIfFirst as used 
elsewhere?
       completed = true;
     }
 
-    @Override
-    public void queryIdArrived(QueryId queryId) {
-      DrillResultSetImpl.this.queryId = queryId;
-    }
   }
 
 }

Reply via email to