DRILL-2884: 2-Hygiene: Associated cleanup, mostly of DrillResultSetImpl. - Re-ordered UserResultsListener methods logically (chronologically). (In UserResultsListener too.) - Re-ordered some other ResultsListener methods more logically (chronologically). - Added ResultsListener logging (incoming and outgoing). - Moved queryId down into ResultListener so ResultListener doesn't need to reach up into DrillResultSetImpl; changed from inner to nested class. - Renamed: - Fixed typo "resultslistener" -> "resultsListener" - MAX -> THROTTLING_QUERY_SIZE_THRESHOLD, queue -> batchQueue, ex -> executionFailureException - Added, purged some TODOs.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7b776e7f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7b776e7f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7b776e7f Branch: refs/heads/master Commit: 7b776e7fb94b1b2b57fe3bc0b586047c47a5f042 Parents: f9efc3b Author: dbarclay <dbarc...@maprtech.com> Authored: Sat Apr 25 21:23:10 2015 -0700 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Tue May 5 17:40:13 2015 -0700 ---------------------------------------------------------------------- .../exec/rpc/user/UserResultsListener.java | 14 +-- .../java/org/apache/drill/jdbc/DrillCursor.java | 2 +- .../drill/jdbc/impl/DrillResultSetImpl.java | 112 ++++++++++++------- 3 files changed, 77 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java index e422a3f..01a44b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java @@ -37,17 +37,17 @@ public interface UserResultsListener { void submissionFailed(UserException ex); /** - * The query has completed (successsful completion or cancellation). The listener will not receive any other - * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED) - * @param state - */ - void queryCompleted(QueryState state); - - /** * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received * @param result data batch received * @param throttle connection throttle */ void dataArrived(QueryDataBatch result, ConnectionThrottle throttle); + /** + * The query has completed (successsful completion or cancellation). The listener will not receive any other + * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED) + * @param state + */ + void queryCompleted(QueryState state); + } http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java index 41b89ce..6bad3ce 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java @@ -65,7 +65,7 @@ public class DrillCursor implements Cursor { public DrillCursor(final DrillResultSetImpl resultSet) { this.resultSet = resultSet; currentBatch = resultSet.currentBatch; - resultsListener = resultSet.resultslistener; + resultsListener = resultSet.resultsListener; } public DrillResultSetImpl getResultSet() { http://git-wip-us.apache.org/repos/asf/drill/blob/7b776e7f/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java index 35674bf..a7cc0c1 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -47,31 +47,31 @@ import org.apache.drill.jdbc.DrillResultSet; import org.apache.drill.jdbc.ExecutionCanceledSqlException; import org.apache.drill.jdbc.SchemaChangeListener; -import com.google.common.collect.Queues; +import static org.slf4j.LoggerFactory.getLogger; +import org.slf4j.Logger; +import com.google.common.collect.Queues; -//???? Split this into interface org.apache.drill.jdbc.DrillResultSet for published -// interface and a class probably named org.apache.drill.jdbc.impl.DrillResultSetImpl. -// Add any needed documentation of Drill-specific behavior of JDBC-defined -// ResultSet methods to new DrillResultSet. ... public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { + @SuppressWarnings("unused") private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class); // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public SchemaChangeListener changeListener; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) - public final ResultsListener resultslistener = new ResultsListener(); - private volatile QueryId queryId; + public final ResultsListener resultsListener = new ResultsListener(); private final DrillClient client; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + // TODO: Resolve: Since is barely manipulated here in DrillResultSetImpl, + // move down into DrillCursor and have this.clean() have cursor clean it. public final RecordBatchLoader currentBatch; // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public final DrillCursor cursor; public boolean hasPendingCancelationNotification; public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult, - ResultSetMetaData resultSetMetaData, TimeZone timeZone) { + ResultSetMetaData resultSetMetaData, TimeZone timeZone) { super(statement, prepareResult, resultSetMetaData, timeZone); DrillConnection c = (DrillConnection) statement.getConnection(); DrillClient client = c.getClient(); @@ -120,10 +120,10 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) public synchronized void cleanup() { - if (queryId != null && ! resultslistener.completed) { - client.cancelQuery(queryId); + if (resultsListener.getQueryId() != null && ! resultsListener.completed) { + client.cancelQuery(resultsListener.getQueryId()); } - resultslistener.close(); + resultsListener.close(); currentBatch.clear(); } @@ -145,45 +145,60 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS DrillConnectionImpl connection = (DrillConnectionImpl) statement.getConnection(); connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(), - resultslistener); + resultsListener); connection.getDriver().handler.onStatementExecute(statement, null); super.execute(); // don't return with metadata until we've achieved at least one return message. try { - resultslistener.latch.await(); + // TODO: Revisit: Why reaching directly into ResultsListener rather than + // calling some wait method? + resultsListener.latch.await(); cursor.next(); } catch (InterruptedException e) { - // TODO: Check: Should this call Thread.currentThread.interrupt()? If - // not, at least document why this is empty. + // TODO: Check: Should this call Thread.currentThread.interrupt()? If + // not, at least document why this is empty. + // TODO: Check: Does anything ever interrupt this? (Is catch needed?) } return this; } public String getQueryId() { - if (queryId != null) { - return QueryIdHelper.getQueryId(queryId); + if (resultsListener.queryId != null) { + return QueryIdHelper.getQueryId(resultsListener.queryId); } else { return null; } } // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) - public class ResultsListener implements UserResultsListener { - private static final int MAX = 100; - private volatile UserException ex; + public static class ResultsListener implements UserResultsListener { + private static Logger logger = getLogger( ResultsListener.class ); + + private static final int THROTTLING_QUEUE_SIZE_THRESHOLD = 100; + + private volatile QueryId queryId; + + + private volatile UserException executionFailureException; volatile boolean completed = false; private volatile boolean autoread = true; private volatile ConnectionThrottle throttle; private volatile boolean closed = false; + // TODO: Rename. It's obvious it's a latch--but what condition or action + // does it represent or control? private CountDownLatch latch = new CountDownLatch(1); private AtomicBoolean receivedMessage = new AtomicBoolean(false); + final LinkedBlockingDeque<QueryDataBatch> batchQueue = + Queues.newLinkedBlockingDeque(); - final LinkedBlockingDeque<QueryDataBatch> queue = Queues.newLinkedBlockingDeque(); + ResultsListener() { + logger.debug( "Query listener created." ); + } // TODO: Doc.: Release what if what is first relative to what? private boolean releaseIfFirst() { @@ -196,22 +211,23 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS } @Override - public void submissionFailed(UserException ex) { - this.ex = ex; - completed = true; - close(); - System.out.println("Query failed: " + ex.getMessage()); + public void queryIdArrived(QueryId queryId) { + logger.debug( "Received query ID: {}.", queryId ); + this.queryId = queryId; } @Override - public void queryCompleted(QueryState state) { - releaseIfFirst(); + public void submissionFailed(UserException ex) { + logger.debug( "Received query failure.", ex ); + this.executionFailureException = ex; completed = true; + close(); + logger.info( "Query failed: ", ex ); } @Override public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - logger.debug("Result arrived {}", result); + logger.debug( "Received query data: {}.", result ); // If we're in a closed state, just release the message. if (closed) { @@ -221,8 +237,8 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS } // We're active; let's add to the queue. - queue.add(result); - if (queue.size() >= MAX - 1) { + batchQueue.add(result); + if (batchQueue.size() >= THROTTLING_QUEUE_SIZE_THRESHOLD - 1) { throttle.setAutoRead(false); this.throttle = throttle; autoread = false; @@ -231,22 +247,36 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS releaseIfFirst(); } + @Override + public void queryCompleted(QueryState state) { + logger.debug( "Query completion arrived: {}.", state ); + releaseIfFirst(); + completed = true; + } + + public QueryId getQueryId() { + return queryId; + } + + // TODO: Doc.: Specify whether result can be null and what that means. public QueryDataBatch getNext() throws Exception { while (true) { - if (ex != null) { - throw ex; + if (executionFailureException != null) { + logger.debug( "Dequeued query failure exception: {}.", executionFailureException ); + throw executionFailureException; } - if (completed && queue.isEmpty()) { + if (completed && batchQueue.isEmpty()) { return null; } else { - QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS); + QueryDataBatch q = batchQueue.poll(50, TimeUnit.MILLISECONDS); if (q != null) { - if (!autoread && queue.size() < MAX / 2) { + if (!autoread && batchQueue.size() < THROTTLING_QUEUE_SIZE_THRESHOLD / 2) { autoread = true; throttle.setAutoRead(true); throttle = null; } + logger.debug( "Dequeued query data: {}.", q ); return q; } } @@ -255,22 +285,18 @@ public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultS void close() { closed = true; - while (!queue.isEmpty()) { - QueryDataBatch qrb = queue.poll(); + while (!batchQueue.isEmpty()) { + QueryDataBatch qrb = batchQueue.poll(); if (qrb != null && qrb.getData() != null) { qrb.getData().release(); } } // close may be called before the first result is received and the main thread is blocked waiting // for the result. In that case we want to unblock the main thread. - latch.countDown(); + latch.countDown(); // TODO: Why not call releaseIfFirst as used elsewhere? completed = true; } - @Override - public void queryIdArrived(QueryId queryId) { - DrillResultSetImpl.this.queryId = queryId; - } } }