This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b34b4353f4 Async reads for JDBC (#13196)
b34b4353f4 is described below

commit b34b4353f4a27065b37feac97995d4984334f8ed
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Oct 18 11:40:57 2022 -0700

    Async reads for JDBC (#13196)
    
    Async reads for JDBC:
    Prevents JDBC timeouts on long queries by returning empty batches
    when a batch fetch takes too long. Uses an async model to run the
    result fetch concurrently with JDBC requests.
    
    Fixed race condition in Druid's Avatica server-side handler
    Fixed issue with no-user connections
---
 .../druid/java/util/common/concurrent/Execs.java   |   1 -
 docs/configuration/index.md                        |   1 +
 .../druid/server/AsyncQueryForwardingServlet.java  |   2 +-
 sql/pom.xml                                        |   6 +-
 .../sql/avatica/AbstractDruidJdbcStatement.java    |   6 +-
 .../druid/sql/avatica/AvaticaServerConfig.java     |  17 +
 .../druid/sql/avatica/DruidAvaticaJsonHandler.java |   2 +-
 .../sql/avatica/DruidAvaticaProtobufHandler.java   |   2 +-
 .../apache/druid/sql/avatica/DruidConnection.java  |  86 ++--
 .../sql/avatica/DruidJdbcPreparedStatement.java    |   8 +-
 .../druid/sql/avatica/DruidJdbcResultSet.java      | 237 +++++++++--
 .../druid/sql/avatica/DruidJdbcStatement.java      |  14 +-
 .../org/apache/druid/sql/avatica/DruidMeta.java    | 123 +++---
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java | 466 +++++++++++++--------
 .../avatica/DruidAvaticaProtobufHandlerTest.java   |   5 +-
 .../druid/sql/avatica/DruidStatementTest.java      |   8 +-
 .../druid/sql/calcite/util/CalciteTests.java       |   6 +-
 17 files changed, 661 insertions(+), 329 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java 
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
index a310e56756..c5bc20f45f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class Execs
 {
-
   /**
    * Returns an ExecutorService which is terminated and shutdown from the 
beginning and not able to accept any tasks.
    */
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cb826ef348..6e92076eb7 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following 
properties on the Broke
 |`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC 
client `Statement.setFetchSize` method. The value for this property must 
greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser 
value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than 
`minRowsPerFrame`, Druid uses the minimum value of the two. For handling 
queries which produce results with a large number of rows, you can increase 
this value to reduce the number of f [...]
 |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous 
open statements per Avatica client connection.|4|
 |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle 
timeout.|PT5M|
+|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds. 
When a request for the next batch of data takes longer than this time, Druid 
returns an empty result set, causing the client to poll again. This avoids HTTP 
timeouts for long-running queries. The default of 5 sec. is good for most 
cases. |5000|
 |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at 
`/druid/v2/sql/`.|true|
 |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN 
query](../querying/topnquery.md). Higher limits will be planned as [GroupBy 
queries](../querying/groupbyquery.md) instead.|100000|
 |`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata 
refreshes.|PT1M|
diff --git 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index fa3b52669b..75b13a39f1 100644
--- 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -402,7 +402,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       HttpServletResponse response
   ) throws ServletException, IOException
   {
-    // Just call the superclass service method. Overriden in tests.
+    // Just call the superclass service method. Overridden in tests.
     super.service(request, response);
   }
 
diff --git a/sql/pom.xml b/sql/pom.xml
index bbf2eb049e..28f7eecffc 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -247,9 +247,13 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
+       <dependency>
+         <groupId>org.jdbi</groupId>
+         <artifactId>jdbi</artifactId>
+         <scope>test</scope>
+       </dependency>
   </dependencies>
 
   <build>
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
index 697ad1ca17..1992c1620b 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PrepareResult;
 
@@ -56,16 +57,19 @@ public abstract class AbstractDruidJdbcStatement implements 
Closeable
 
   protected final String connectionId;
   protected final int statementId;
+  protected final ResultFetcherFactory fetcherFactory;
   protected Throwable throwable;
   protected DruidJdbcResultSet resultSet;
 
   public AbstractDruidJdbcStatement(
       final String connectionId,
-      final int statementId
+      final int statementId,
+      final ResultFetcherFactory fetcherFactory
   )
   {
     this.connectionId = Preconditions.checkNotNull(connectionId, 
"connectionId");
     this.statementId = statementId;
+    this.fetcherFactory = fetcherFactory;
   }
 
   protected static Meta.Signature createSignature(
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
index e931c3a289..a5215c19c0 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
@@ -30,6 +30,7 @@ class AvaticaServerConfig
   public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
   public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
   public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;
+  public static int DEFAULT_FETCH_TIMEOUT_MS = 5000;
 
   @JsonProperty
   public int maxConnections = DEFAULT_MAX_CONNECTIONS;
@@ -46,6 +47,17 @@ class AvaticaServerConfig
   @JsonProperty
   public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;
 
+  /**
+   * The maximum amount of time to wait per-fetch for the next result set.
+   * If a query takes longer than this amount of time, then the fetch will
+   * return 0 rows, without EOF, and the client will automatically try
+   * another fetch. The result is an async protocol that avoids network
+   * timeouts for long-running queries, especially those that take a long
+   * time to deliver a first batch of results.
+   */
+  @JsonProperty
+  public int fetchTimeoutMs = DEFAULT_FETCH_TIMEOUT_MS;
+
   public int getMaxConnections()
   {
     return maxConnections;
@@ -77,4 +89,9 @@ class AvaticaServerConfig
     }
     return minRowsPerFrame;
   }
+
+  public int getFetchTimeoutMs()
+  {
+    return fetchTimeoutMs;
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
index e2d223d126..4f1a5818bf 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
 
 public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
 {
-  public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
   public static final String AVATICA_PATH_NO_TRAILING_SLASH = 
"/druid/v2/sql/avatica";
+  public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + 
"/";
 
   @Inject
   public DruidAvaticaJsonHandler(
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
index 50c54ad27a..a15efadda6 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
 
 public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
 {
-  public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";
   public static final String AVATICA_PATH_NO_TRAILING_SLASH = 
"/druid/v2/sql/avatica-protobuf";
+  public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH + 
"/";
 
   @Inject
   public DruidAvaticaProtobufHandler(
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
index 23f1a222dd..5aa3f422b7 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
@@ -24,8 +24,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.sql.PreparedStatement;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 
 import java.util.Collections;
 import java.util.Map;
@@ -37,6 +39,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Connection tracking for {@link DruidMeta}. Thread-safe.
+ * <p>
+ * Lock is the instance itself. Used here to protect two members, and in
+ * other code when we must resolve the connection after resolving the 
statement.
+ * The lock prevents closing the connection concurrently with an operation on
+ * a statement for that connection.
  */
 public class DruidConnection
 {
@@ -56,13 +63,12 @@ public class DruidConnection
   private final AtomicInteger statementCounter = new AtomicInteger();
   private final AtomicReference<Future<?>> timeoutFuture = new 
AtomicReference<>();
 
-  // Typically synchronized by connectionLock, except in one case: the onClose 
function passed
+  // Typically synchronized by this instance, except in one case: the onClose 
function passed
   // into DruidStatements contained by the map.
-  @GuardedBy("connectionLock")
+  @GuardedBy("this")
   private final ConcurrentMap<Integer, AbstractDruidJdbcStatement> statements 
= new ConcurrentHashMap<>();
-  private final Object connectionLock = new Object();
 
-  @GuardedBy("connectionLock")
+  @GuardedBy("this")
   private boolean open = true;
 
   public DruidConnection(
@@ -93,13 +99,14 @@ public class DruidConnection
     return userSecret;
   }
 
-  public DruidJdbcStatement createStatement(
-      final SqlStatementFactory sqlStatementFactory
+  public synchronized DruidJdbcStatement createStatement(
+      final SqlStatementFactory sqlStatementFactory,
+      final ResultFetcherFactory fetcherFactory
   )
   {
     final int statementId = statementCounter.incrementAndGet();
 
-    synchronized (connectionLock) {
+    synchronized (this) {
       if (statements.containsKey(statementId)) {
         // Will only happen if statementCounter rolls over before old 
statements are cleaned up. If this
         // ever happens then something fishy is going on, because we shouldn't 
have billions of statements.
@@ -114,7 +121,9 @@ public class DruidConnection
       final DruidJdbcStatement statement = new DruidJdbcStatement(
           connectionId,
           statementId,
-          sqlStatementFactory
+          sessionContext,
+          sqlStatementFactory,
+          fetcherFactory
       );
 
       statements.put(statementId, statement);
@@ -123,15 +132,16 @@ public class DruidConnection
     }
   }
 
-  public DruidJdbcPreparedStatement createPreparedStatement(
+  public synchronized DruidJdbcPreparedStatement createPreparedStatement(
       final SqlStatementFactory sqlStatementFactory,
       final SqlQueryPlus sqlQueryPlus,
-      final long maxRowCount
+      final long maxRowCount,
+      final ResultFetcherFactory fetcherFactory
   )
   {
     final int statementId = statementCounter.incrementAndGet();
 
-    synchronized (connectionLock) {
+    synchronized (this) {
       if (statements.containsKey(statementId)) {
         // Will only happen if statementCounter rolls over before old 
statements are cleaned up. If this
         // ever happens then something fishy is going on, because we shouldn't 
have billions of statements.
@@ -143,11 +153,15 @@ public class DruidConnection
       }
 
       @SuppressWarnings("GuardedBy")
+      final PreparedStatement statement = 
sqlStatementFactory.preparedStatement(
+          sqlQueryPlus.withContext(sessionContext)
+      );
       final DruidJdbcPreparedStatement jdbcStmt = new 
DruidJdbcPreparedStatement(
           connectionId,
           statementId,
-          sqlStatementFactory.preparedStatement(sqlQueryPlus),
-          maxRowCount
+          statement,
+          maxRowCount,
+          fetcherFactory
       );
 
       statements.put(statementId, jdbcStmt);
@@ -156,17 +170,15 @@ public class DruidConnection
     }
   }
 
-  public AbstractDruidJdbcStatement getStatement(final int statementId)
+  public synchronized AbstractDruidJdbcStatement getStatement(final int 
statementId)
   {
-    synchronized (connectionLock) {
-      return statements.get(statementId);
-    }
+    return statements.get(statementId);
   }
 
   public void closeStatement(int statementId)
   {
     AbstractDruidJdbcStatement stmt;
-    synchronized (connectionLock) {
+    synchronized (this) {
       stmt = statements.remove(statementId);
     }
     if (stmt != null) {
@@ -180,34 +192,30 @@ public class DruidConnection
    *
    * @return true if closed
    */
-  public boolean closeIfEmpty()
+  public synchronized boolean closeIfEmpty()
   {
-    synchronized (connectionLock) {
-      if (statements.isEmpty()) {
-        close();
-        return true;
-      } else {
-        return false;
-      }
+    if (statements.isEmpty()) {
+      close();
+      return true;
+    } else {
+      return false;
     }
   }
 
-  public void close()
+  public synchronized void close()
   {
-    synchronized (connectionLock) {
-      // Copy statements before iterating because statement.close() modifies 
it.
-      for (AbstractDruidJdbcStatement statement : 
ImmutableList.copyOf(statements.values())) {
-        try {
-          statement.close();
-        }
-        catch (Exception e) {
-          LOG.warn("Connection [%s] failed to close statement [%s]!", 
connectionId, statement.getStatementId());
-        }
+    // Copy statements before iterating because statement.close() modifies it.
+    for (AbstractDruidJdbcStatement statement : 
ImmutableList.copyOf(statements.values())) {
+      try {
+        statement.close();
+      }
+      catch (Exception e) {
+        LOG.warn("Connection [%s] failed to close statement [%s]!", 
connectionId, statement.getStatementId());
       }
-
-      LOG.debug("Connection [%s] closed.", connectionId);
-      open = false;
     }
+
+    LOG.debug("Connection [%s] closed.", connectionId);
+    open = false;
   }
 
   public DruidConnection sync(final Future<?> newTimeoutFuture)
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
index 3cd608addb..dcd599c542 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.DirectStatement;
 import org.apache.druid.sql.PreparedStatement;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 import org.apache.druid.sql.calcite.planner.PrepareResult;
 
 import java.util.List;
@@ -50,10 +51,11 @@ public class DruidJdbcPreparedStatement extends 
AbstractDruidJdbcStatement
       final String connectionId,
       final int statementId,
       final PreparedStatement stmt,
-      final long maxRowCount
+      final long maxRowCount,
+      final ResultFetcherFactory fetcherFactory
   )
   {
-    super(connectionId, statementId);
+    super(connectionId, statementId, fetcherFactory);
     this.sqlStatement = stmt;
     this.maxRowCount = maxRowCount;
   }
@@ -98,7 +100,7 @@ public class DruidJdbcPreparedStatement extends 
AbstractDruidJdbcStatement
     closeResultSet();
     try {
       DirectStatement directStmt = sqlStatement.execute(parameters);
-      resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount);
+      resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount, 
fetcherFactory);
       resultSet.execute();
     }
     // Failure to execute does not close the prepared statement.
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 2b49401552..1eb0d1aa5e 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -22,20 +22,27 @@ package org.apache.druid.sql.avatica;
 import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.calcite.avatica.Meta;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.sql.DirectStatement;
 
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Druid's server-side representation of a JDBC result set. At most one
@@ -59,6 +66,105 @@ import java.util.concurrent.ExecutorService;
  */
 public class DruidJdbcResultSet implements Closeable
 {
+  private static final Logger LOG = new Logger(DruidJdbcResultSet.class);
+
+  /**
+   * Asynchronous result fetcher. JDBC operates via REST, which is subject to
+   * a timeout if a query takes too long to respond. Fortunately, JDBC uses a
+   * batched API, and is perfectly happy to get an empty batch. This class
+   * runs in a separate thread to fetch a batch. If the fetch takes too long,
+   * the JDBC request thread will time out waiting, will return an empty batch
+   * to the client, and will remember the fetch for use in the next fetch
+   * request. The result is that the time it takes to produce results for long
+   * running queries is decoupled from the HTTP timeout.
+   */
+  public static class ResultFetcher implements Callable<Meta.Frame>
+  {
+    private final int limit;
+    private int batchSize;
+    private int offset;
+    private Yielder<Object[]> yielder;
+
+    public ResultFetcher(
+        final int limit,
+        final Yielder<Object[]> yielder
+    )
+    {
+      this.limit = limit;
+      this.yielder = yielder;
+    }
+
+    /**
+     * In an ideal world, the batch size would be a constructor parameter. 
But, JDBC,
+     * oddly, allows a different batch size per request. Hence, we set the 
size using
+     * this method before each fetch attempt.
+     */
+    public void setBatchSize(int batchSize)
+    {
+      this.batchSize = batchSize;
+    }
+
+    /**
+     * Result is only valid between executions, which turns out to be
+     * the only time it is called.
+     */
+    public int offset()
+    {
+      return offset;
+    }
+
+    /**
+     * Fetch the next batch up to the batch size or EOF. Return
+     * the resulting frame. Exceptions are handled by the executor
+     * framework.
+     */
+    @Override
+    public Meta.Frame call()
+    {
+      Preconditions.checkState(batchSize > 0);
+      int rowCount = 0;
+      final int batchLimit = Math.min(limit - offset, batchSize);
+      final List<Object> rows = new ArrayList<>(batchLimit);
+      while (!yielder.isDone() && rowCount < batchLimit) {
+        rows.add(yielder.get());
+        yielder = yielder.next(null);
+        rowCount++;
+      }
+
+      final Meta.Frame result = new Meta.Frame(offset, yielder.isDone(), rows);
+      offset += rowCount;
+      return result;
+    }
+  }
+
+  /**
+   * Creates the result fetcher and holds config. Rather overkill for 
production,
+   * but handy for testing.
+   */
+  public static class ResultFetcherFactory
+  {
+    final int fetchTimeoutMs;
+
+    public ResultFetcherFactory(int fetchTimeoutMs)
+    {
+      // To prevent server hammering, the timeout must be at least 1 second.
+      this.fetchTimeoutMs = Math.max(1000, fetchTimeoutMs);
+    }
+
+    public int fetchTimeoutMs()
+    {
+      return fetchTimeoutMs;
+    }
+
+    public ResultFetcher newFetcher(
+        final int limit,
+        final Yielder<Object[]> yielder
+    )
+    {
+      return new ResultFetcher(limit, yielder);
+    }
+  }
+
   /**
    * Query metrics can only be used within a single thread. Because results can
    * be paginated into multiple JDBC frames (each frame being processed by a
@@ -77,25 +183,46 @@ public class DruidJdbcResultSet implements Closeable
    * https://github.com/apache/druid/pull/4288
    * https://github.com/apache/druid/pull/4415
    */
-  private final ExecutorService yielderOpenCloseExecutor;
+  private final ExecutorService queryExecutor;
   private final DirectStatement stmt;
   private final long maxRowCount;
+  private final ResultFetcherFactory fetcherFactory;
   private State state = State.NEW;
   private Meta.Signature signature;
-  private Yielder<Object[]> yielder;
-  private int offset;
+
+  /**
+   * The fetcher which reads batches of rows. Holds onto the yielder for a
+   * query. Maintains the current read offset.
+   */
+  private ResultFetcher fetcher;
+
+  /**
+   * Future for a fetch that timed out waiting, and should be used again on
+   * the next fetch request.
+   */
+  private Future<Meta.Frame> fetchFuture;
+
+  /**
+   * Cached version of the read offset in case the caller asks for the offset
+   * concurrently with a fetch which may update its own offset. This offset
+   * is that for the last batch that the client fetched: the fetcher itself
+   * may be moving to a new offset.
+   */
+  private int nextFetchOffset;
 
   public DruidJdbcResultSet(
       final AbstractDruidJdbcStatement jdbcStatement,
       final DirectStatement stmt,
-      final long maxRowCount
+      final long maxRowCount,
+      final ResultFetcherFactory fetcherFactory
   )
   {
     this.stmt = stmt;
     this.maxRowCount = maxRowCount;
-    this.yielderOpenCloseExecutor = Execs.singleThreaded(
+    this.fetcherFactory = fetcherFactory;
+    this.queryExecutor = Execs.singleThreaded(
         StringUtils.format(
-            "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
+            "JDBCQueryExecutor-connection-%s-statement-%d",
             StringUtils.encodeForFormat(jdbcStatement.getConnectionId()),
             jdbcStatement.getStatementId()
         )
@@ -107,19 +234,22 @@ public class DruidJdbcResultSet implements Closeable
     ensure(State.NEW);
     try {
       state = State.RUNNING;
-      final Sequence<Object[]> baseSequence = 
yielderOpenCloseExecutor.submit(stmt::execute).get().getResults();
 
-      // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
-      final Sequence<Object[]> retSequence =
-          maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
-          ? baseSequence.limit((int) maxRowCount)
-          : baseSequence;
+      // Execute the first step: plan the query and return a sequence to use
+      // to get values.
+      final Sequence<Object[]> sequence = 
queryExecutor.submit(stmt::execute).get().getResults();
 
-      yielder = Yielders.each(retSequence);
+      // Subsequent fetch steps are done via the async "fetcher".
+      fetcher = fetcherFactory.newFetcher(
+          // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
+          maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int) 
maxRowCount : Integer.MAX_VALUE,
+          Yielders.each(sequence)
+      );
       signature = AbstractDruidJdbcStatement.createSignature(
           stmt.prepareResult(),
           stmt.query().sql()
       );
+      LOG.debug("Opened result set [%s]", stmt.sqlQueryId());
     }
     catch (ExecutionException e) {
       throw closeAndPropagateThrowable(e.getCause());
@@ -143,34 +273,61 @@ public class DruidJdbcResultSet implements Closeable
   public synchronized Meta.Frame nextFrame(final long fetchOffset, final int 
fetchMaxRowCount)
   {
     ensure(State.RUNNING, State.DONE);
-    Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] != 
offset [%,d]", fetchOffset, offset);
+    if (fetchOffset != nextFetchOffset) {
+      throw new IAE(
+          "Druid can only fetch forward. Requested offset of %,d != current 
offset %,d",
+          fetchOffset,
+          nextFetchOffset
+      );
+    }
     if (state == State.DONE) {
-      return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+      LOG.debug("EOF at offset %,d for result set [%s]", fetchOffset, 
stmt.sqlQueryId());
+      return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
     }
 
+    final Future<Meta.Frame> future;
+    if (fetchFuture == null) {
+      // Not waiting on a batch. Request one now.
+      fetcher.setBatchSize(fetchMaxRowCount);
+      future = queryExecutor.submit(fetcher);
+    } else {
+      // Last batch took too long. Continue waiting for it.
+      future = fetchFuture;
+      fetchFuture = null;
+    }
     try {
-      final List<Object> rows = new ArrayList<>();
-      while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset < 
fetchOffset + fetchMaxRowCount)) {
-        rows.add(yielder.get());
-        yielder = yielder.next(null);
-        offset++;
-      }
-
-      if (yielder.isDone()) {
+      Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(), 
TimeUnit.MILLISECONDS);
+      LOG.debug("Fetched batch at offset %,d for result set [%s]", 
fetchOffset, stmt.sqlQueryId());
+      if (result.done) {
         state = State.DONE;
       }
-
-      return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+      nextFetchOffset = fetcher.offset;
+      return result;
     }
-    catch (Throwable t) {
-      throw closeAndPropagateThrowable(t);
+    catch (CancellationException | InterruptedException e) {
+      // Consider this a failure.
+      throw closeAndPropagateThrowable(e);
+    }
+    catch (ExecutionException e) {
+      // Fetch threw an error. Unwrap it.
+      throw closeAndPropagateThrowable(e.getCause());
+    }
+    catch (TimeoutException e) {
+      LOG.debug("Timeout of batch at offset %,d for result set [%s]", 
fetchOffset, stmt.sqlQueryId());
+      fetchFuture = future;
+      // Wait timed out. Return 0 rows: the client will try again later.
+      // We'll wait again on this same fetch next time.
+      // Note that when the next fetch request comes, it will use the batch
+      // size set here: any change in size will be ignored for the in-flight 
batch.
+      // Changing batch size mid-query is an odd case: it will probably never 
happen.
+      return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());
     }
   }
 
   public synchronized long getCurrentOffset()
   {
     ensure(State.RUNNING, State.DONE);
-    return offset;
+    return fetcher.offset;
   }
 
   @GuardedBy("this")
@@ -215,14 +372,28 @@ public class DruidJdbcResultSet implements Closeable
     if (state == State.CLOSED || state == State.FAILED) {
       return;
     }
+    LOG.debug("Closing result set [%s]", stmt.sqlQueryId());
     state = State.CLOSED;
     try {
-      if (yielder != null) {
-        Yielder<Object[]> theYielder = this.yielder;
-        this.yielder = null;
+      // If a fetch is in progress, wait for it to complete.
+      if (fetchFuture != null) {
+        try {
+          fetchFuture.cancel(true);
+          fetchFuture.get();
+        }
+        catch (Exception e) {
+          // Ignore, we're shutting down anyway.
+        }
+        finally {
+          fetchFuture = null;
+        }
+      }
+      if (fetcher != null) {
+        Yielder<Object[]> theYielder = fetcher.yielder;
+        fetcher = null;
 
         // Put the close last, so any exceptions it throws are after we did 
the other cleanup above.
-        yielderOpenCloseExecutor.submit(
+        queryExecutor.submit(
             () -> {
               theYielder.close();
               // makes this a Callable instead of Runnable so we don't need to 
catch exceptions inside the lambda
@@ -230,7 +401,7 @@ public class DruidJdbcResultSet implements Closeable
             }
         ).get();
 
-        yielderOpenCloseExecutor.shutdownNow();
+        queryExecutor.shutdownNow();
       }
     }
     catch (RuntimeException e) {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
index 3b84b7e483..4c342a46fe 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
@@ -24,6 +24,9 @@ import org.apache.calcite.avatica.Meta;
 import org.apache.druid.sql.DirectStatement;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
+
+import java.util.Map;
 
 /**
  * Represents Druid's version of the JDBC {@code Statement} class:
@@ -36,22 +39,27 @@ import org.apache.druid.sql.SqlStatementFactory;
 public class DruidJdbcStatement extends AbstractDruidJdbcStatement
 {
   private final SqlStatementFactory lifecycleFactory;
+  protected final Map<String, Object> queryContext;
 
   public DruidJdbcStatement(
       final String connectionId,
       final int statementId,
-      final SqlStatementFactory lifecycleFactory
+      final Map<String, Object> queryContext,
+      final SqlStatementFactory lifecycleFactory,
+      final ResultFetcherFactory fetcherFactory
   )
   {
-    super(connectionId, statementId);
+    super(connectionId, statementId, fetcherFactory);
+    this.queryContext = queryContext;
     this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory, 
"lifecycleFactory");
   }
 
   public synchronized void execute(SqlQueryPlus queryPlus, long maxRowCount)
   {
     closeResultSet();
+    queryPlus = queryPlus.withContext(queryContext);
     DirectStatement stmt = lifecycleFactory.directStatement(queryPlus);
-    resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE);
+    resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE, 
fetcherFactory);
     try {
       resultSet.execute();
     }
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
index 6ee3de811c..76efecf68b 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
@@ -49,6 +49,7 @@ import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.joda.time.Interval;
 
@@ -95,7 +96,20 @@ public class DruidMeta extends MetaImpl
    */
   public static <T extends Throwable> T logFailure(T error)
   {
-    logFailure(error, error.getMessage());
+    if (error instanceof NoSuchConnectionException) {
+      NoSuchConnectionException ex = (NoSuchConnectionException) error;
+      logFailure(error, "No such connection: %s", ex.getConnectionId());
+    } else if (error instanceof NoSuchStatementException) {
+      NoSuchStatementException ex = (NoSuchStatementException) error;
+      logFailure(
+          error,
+          "No such statement: %s, %d",
+          ex.getStatementHandle().connectionId,
+          ex.getStatementHandle().id
+      );
+    } else {
+      logFailure(error, error.getMessage());
+    }
     return error;
   }
 
@@ -115,6 +129,7 @@ public class DruidMeta extends MetaImpl
   private final AvaticaServerConfig config;
   private final List<Authenticator> authenticators;
   private final ErrorHandler errorHandler;
+  private final ResultFetcherFactory fetcherFactory;
 
   /**
    * Tracks logical connections.
@@ -145,7 +160,8 @@ public class DruidMeta extends MetaImpl
                 .setDaemon(true)
                 .build()
         ),
-        authMapper.getAuthenticatorChain()
+        authMapper.getAuthenticatorChain(),
+        new ResultFetcherFactory(config.getFetchTimeoutMs())
     );
   }
 
@@ -154,7 +170,8 @@ public class DruidMeta extends MetaImpl
       final AvaticaServerConfig config,
       final ErrorHandler errorHandler,
       final ScheduledExecutorService exec,
-      final List<Authenticator> authenticators
+      final List<Authenticator> authenticators,
+      final ResultFetcherFactory fetcherFactory
   )
   {
     super(null);
@@ -163,6 +180,7 @@ public class DruidMeta extends MetaImpl
     this.errorHandler = errorHandler;
     this.exec = exec;
     this.authenticators = authenticators;
+    this.fetcherFactory = fetcherFactory;
   }
 
   @Override
@@ -188,11 +206,6 @@ public class DruidMeta extends MetaImpl
     try {
       openDruidConnection(ch.id, secret, contextMap);
     }
-    catch (NoSuchConnectionException e) {
-      // Avoid sanitizing Avatica specific exceptions so that the Avatica code
-      // can rely on them to handle issues in a JDBC-specific way.
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -208,9 +221,6 @@ public class DruidMeta extends MetaImpl
         druidConnection.close();
       }
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -224,9 +234,6 @@ public class DruidMeta extends MetaImpl
       getDruidConnection(ch.id);
       return connProps;
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -241,12 +248,10 @@ public class DruidMeta extends MetaImpl
   public StatementHandle createStatement(final ConnectionHandle ch)
   {
     try {
-      final DruidJdbcStatement druidStatement = 
getDruidConnection(ch.id).createStatement(sqlStatementFactory);
+      final DruidJdbcStatement druidStatement = getDruidConnection(ch.id)
+          .createStatement(sqlStatementFactory, fetcherFactory);
       return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -275,15 +280,13 @@ public class DruidMeta extends MetaImpl
       final DruidJdbcPreparedStatement stmt = 
getDruidConnection(ch.id).createPreparedStatement(
           sqlStatementFactory,
           sqlReq,
-          maxRowCount
+          maxRowCount,
+          fetcherFactory
       );
       stmt.prepare();
       LOG.debug("Successfully prepared statement [%s] for execution", 
stmt.getStatementId());
       return new StatementHandle(ch.id, stmt.getStatementId(), 
stmt.getSignature());
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -315,7 +318,7 @@ public class DruidMeta extends MetaImpl
   }
 
   /**
-   * Prepares and executes a JDBC {@code Statement}
+   * Prepares and executes a JDBC {@code Statement}.
    */
   @Override
   public ExecuteResult prepareAndExecute(
@@ -324,26 +327,25 @@ public class DruidMeta extends MetaImpl
       final long maxRowCount,
       final int maxRowsInFirstFrame,
       final PrepareCallback callback
-  ) throws NoSuchStatementException
+  )
   {
-
     try {
       // Ignore "callback", this class is designed for use with LocalService 
which doesn't use it.
       final DruidJdbcStatement druidStatement = getDruidStatement(statement, 
DruidJdbcStatement.class);
       final DruidConnection druidConnection = 
getDruidConnection(statement.connectionId);
-      AuthenticationResult authenticationResult = 
doAuthenticate(druidConnection);
-      SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
-          .auth(authenticationResult)
-          .context(druidConnection.sessionContext())
-          .build();
-      druidStatement.execute(sqlRequest, maxRowCount);
-      ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
-      LOG.debug("Successfully prepared statement [%s] and started execution", 
druidStatement.getStatementId());
-      return result;
-    }
-    // Cannot affect these exceptions as Avatica handles them.
-    catch (NoSuchConnectionException | NoSuchStatementException e) {
-      throw e;
+
+      // This method is called directly from the Avatica server: it does not go
+      // through the connection first. We must lock the connection here to 
prevent race conditions.
+      synchronized (druidConnection) {
+        final AuthenticationResult authenticationResult = 
doAuthenticate(druidConnection);
+        final SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
+            .auth(authenticationResult)
+            .build();
+        druidStatement.execute(sqlRequest, maxRowCount);
+        final ExecuteResult result = doFetch(druidStatement, 
maxRowsInFirstFrame);
+        LOG.debug("Successfully prepared statement [%s] and started 
execution", druidStatement.getStatementId());
+        return result;
+      }
     }
     catch (Throwable t) {
       throw mapException(t);
@@ -357,6 +359,15 @@ public class DruidMeta extends MetaImpl
    */
   private RuntimeException mapException(Throwable t)
   {
+    // Don't sanitize or wrap Avatica exceptions: these exceptions
+    // are handled specially by Avatica to provide SQLState, Error Code
+    // and other JDBC-specific items.
+    if (t instanceof AvaticaRuntimeException) {
+      throw (AvaticaRuntimeException) t;
+    }
+    if (t instanceof NoSuchConnectionException) {
+      throw (NoSuchConnectionException) t;
+    }
     // BasicSecurityAuthenticationException is not visible here.
     String className = t.getClass().getSimpleName();
     if (t instanceof ForbiddenException ||
@@ -365,7 +376,8 @@ public class DruidMeta extends MetaImpl
           t.getMessage(),
           ErrorResponse.UNAUTHORIZED_ERROR_CODE,
           ErrorResponse.UNAUTHORIZED_SQL_STATE,
-          AvaticaSeverity.ERROR);
+          AvaticaSeverity.ERROR
+      );
     }
 
     // Let Avatica do its default mapping.
@@ -425,9 +437,6 @@ public class DruidMeta extends MetaImpl
       LOG.debug("Fetching next frame from offset %,d with %,d rows for 
statement [%s]", offset, maxRows, statement.id);
       return getDruidStatement(statement, 
AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -450,7 +459,7 @@ public class DruidMeta extends MetaImpl
       final StatementHandle statement,
       final List<TypedValue> parameterValues,
       final int maxRowsInFirstFrame
-  ) throws NoSuchStatementException
+  )
   {
     try {
       final DruidJdbcPreparedStatement druidStatement =
@@ -462,9 +471,6 @@ public class DruidMeta extends MetaImpl
           druidStatement.getStatementId());
       return result;
     }
-    catch (NoSuchStatementException | NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -493,9 +499,6 @@ public class DruidMeta extends MetaImpl
         druidConnection.closeStatement(h.id);
       }
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -506,7 +509,7 @@ public class DruidMeta extends MetaImpl
       final StatementHandle sh,
       final QueryState state,
       final long offset
-  ) throws NoSuchStatementException
+  )
   {
     try {
       final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh, 
AbstractDruidJdbcStatement.class);
@@ -521,9 +524,6 @@ public class DruidMeta extends MetaImpl
       }
       return !isDone;
     }
-    catch (NoSuchStatementException | NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -560,9 +560,6 @@ public class DruidMeta extends MetaImpl
 
       return sqlResultSet(ch, sql);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -597,9 +594,6 @@ public class DruidMeta extends MetaImpl
 
       return sqlResultSet(ch, sql);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -656,9 +650,6 @@ public class DruidMeta extends MetaImpl
 
       return sqlResultSet(ch, sql);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -726,9 +717,6 @@ public class DruidMeta extends MetaImpl
 
       return sqlResultSet(ch, sql);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -747,9 +735,6 @@ public class DruidMeta extends MetaImpl
 
       return sqlResultSet(ch, sql);
     }
-    catch (NoSuchConnectionException e) {
-      throw e;
-    }
     catch (Throwable t) {
       throw mapException(t);
     }
@@ -851,7 +836,7 @@ public class DruidMeta extends MetaImpl
     return connection.sync(
         exec.schedule(
             () -> {
-              LOG.debug("Connection[%s] timed out.", connectionId);
+              LOG.debug("Connection [%s] timed out.", connectionId);
               closeConnection(new ConnectionHandle(connectionId));
             },
             new Interval(DateTimes.nowUtc(), 
config.getConnectionIdleTimeout()).toDurationMillis(),
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 119d10ae4e..9cf65fe196 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -67,6 +68,9 @@ import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcher;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@@ -90,11 +94,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.ResultIterator;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -117,30 +122,27 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Tests the Avatica-based JDBC implementation using JSON serialization. See
  * {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs
  * this same set of tests using Protobuf serialization.
+ * To run this in an IDE, set {@code -Duser.timezone=UTC}.
  */
 public class DruidAvaticaHandlerTest extends CalciteTestBase
 {
-  private static final AvaticaServerConfig AVATICA_CONFIG = new 
AvaticaServerConfig()
-  {
-    @Override
-    public int getMaxConnections()
-    {
-      // This must match the number of Connection objects created in 
testTooManyStatements()
-      return 4;
-    }
+  private static final int CONNECTION_LIMIT = 4;
+  private static final int STATEMENT_LIMIT = 4;
+
+  private static final AvaticaServerConfig AVATICA_CONFIG;
+
+  static {
+    AVATICA_CONFIG = new AvaticaServerConfig();
+    // This must match the number of Connection objects created in 
testTooManyStatements()
+    AVATICA_CONFIG.maxConnections = CONNECTION_LIMIT;
+    AVATICA_CONFIG.maxStatementsPerConnection = STATEMENT_LIMIT;
+  }
 
-    @Override
-    public int getMaxStatementsPerConnection()
-    {
-      return 4;
-    }
-  };
   private static final String DUMMY_SQL_QUERY_ID = "dummy";
 
   private static QueryRunnerFactoryConglomerate conglomerate;
@@ -162,35 +164,99 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
     resourceCloser.close();
   }
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Rule
   public QueryLogHook queryLogHook = QueryLogHook.create();
 
+  private final PlannerConfig plannerConfig = new PlannerConfig();
+  private final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
+  private final ExprMacroTable macroTable = 
CalciteTests.createExprMacroTable();
   private SpecificSegmentsQuerySegmentWalker walker;
-  private Server server;
+  private ServerWrapper server;
   private Connection client;
   private Connection clientNoTrailingSlash;
   private Connection superuserClient;
   private Connection clientLosAngeles;
-  private DruidMeta druidMeta;
-  private String url;
   private Injector injector;
   private TestRequestLogger testRequestLogger;
 
+  private DruidSchemaCatalog makeRootSchema()
+  {
+    return CalciteTests.createMockRootSchema(
+        conglomerate,
+        walker,
+        plannerConfig,
+        CalciteTests.TEST_AUTHORIZER_MAPPER
+    );
+  }
+
+  private class ServerWrapper
+  {
+    final DruidMeta druidMeta;
+    final Server server;
+    final String url;
+
+    ServerWrapper(final DruidMeta druidMeta) throws Exception
+    {
+      this.druidMeta = druidMeta;
+      server = new Server(0);
+      server.setHandler(getAvaticaHandler(druidMeta));
+      server.start();
+      url = StringUtils.format(
+          "jdbc:avatica:remote:url=%s%s",
+          server.getURI().toString(),
+          StringUtils.maybeRemoveLeadingSlash(getJdbcUrlTail())
+      );
+    }
+
+    public Connection getConnection(String user, String password) throws 
SQLException
+    {
+      return DriverManager.getConnection(url, user, password);
+    }
+
+    public Connection getUserConnection() throws SQLException
+    {
+      return getConnection("regularUser", "druid");
+    }
+
+    // Note: though the URL-only form is OK in general, but it will cause tests
+    // to crash as the mock auth test code needs the user name.
+    // Use getUserConnection() instead, or create a URL that includes the
+    // user name and password.
+    //public Connection getConnection() throws SQLException
+    //{
+    //  return DriverManager.getConnection(url);
+    //}
+
+    public void close() throws Exception
+    {
+      druidMeta.closeAllConnections();
+      server.stop();
+    }
+  }
+
+  protected String getJdbcUrlTail()
+  {
+    return DruidAvaticaJsonHandler.AVATICA_PATH;
+  }
+
+  // Default implementation is for JSON to allow debugging of tests.
+  protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+  {
+    return new DruidAvaticaJsonHandler(
+            druidMeta,
+            new DruidNode("dummy", "dummy", false, 1, null, true, false),
+            new AvaticaMonitor()
+    );
+  }
+
   @Before
   public void setUp() throws Exception
   {
     walker = CalciteTests.createMockWalker(conglomerate, 
temporaryFolder.newFolder());
-    final PlannerConfig plannerConfig = new PlannerConfig();
-    final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
-    final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
-    final DruidSchemaCatalog rootSchema =
-        CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, 
CalciteTests.TEST_AUTHORIZER_MAPPER);
+    final DruidSchemaCatalog rootSchema = makeRootSchema();
     testRequestLogger = new TestRequestLogger();
 
     injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
@@ -227,37 +293,34 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
          )
         .build();
 
-    druidMeta = injector.getInstance(DruidMeta.class);
-    final AbstractAvaticaHandler handler = this.getAvaticaHandler(druidMeta);
-    final int port = ThreadLocalRandom.current().nextInt(9999) + 10000;
-    server = new Server(new InetSocketAddress("127.0.0.1", port));
-    server.setHandler(handler);
-    server.start();
-    url = this.getJdbcConnectionString(port);
-    client = DriverManager.getConnection(url, "regularUser", "druid");
-    superuserClient = DriverManager.getConnection(url, 
CalciteTests.TEST_SUPERUSER_NAME, "druid");
-    clientNoTrailingSlash = 
DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(url), 
CalciteTests.TEST_SUPERUSER_NAME, "druid");
+    DruidMeta druidMeta = injector.getInstance(DruidMeta.class);
+    server = new ServerWrapper(druidMeta);
+    client = server.getUserConnection();
+    superuserClient = server.getConnection(CalciteTests.TEST_SUPERUSER_NAME, 
"druid");
+    clientNoTrailingSlash = 
DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(server.url), 
CalciteTests.TEST_SUPERUSER_NAME, "druid");
 
     final Properties propertiesLosAngeles = new Properties();
     propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
     propertiesLosAngeles.setProperty("user", "regularUserLA");
     propertiesLosAngeles.setProperty(BaseQuery.SQL_QUERY_ID, 
DUMMY_SQL_QUERY_ID);
-    clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
+    clientLosAngeles = DriverManager.getConnection(server.url, 
propertiesLosAngeles);
   }
 
   @After
   public void tearDown() throws Exception
   {
-    client.close();
-    clientLosAngeles.close();
-    clientNoTrailingSlash.close();
-    server.stop();
+    if (server != null) {
+      client.close();
+      clientLosAngeles.close();
+      clientNoTrailingSlash.close();
+      server.close();
+      client = null;
+      clientLosAngeles = null;
+      clientNoTrailingSlash = null;
+      server = null;
+    }
     walker.close();
     walker = null;
-    client = null;
-    clientLosAngeles = null;
-    clientNoTrailingSlash = null;
-    server = null;
   }
 
   @Test
@@ -772,19 +835,21 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
   @Test
   public void testTooManyStatements() throws SQLException
   {
-    for (int i = 0; i < 4; i++) {
+    for (int i = 0; i < STATEMENT_LIMIT; i++) {
       client.createStatement();
     }
 
-    expectedException.expect(AvaticaClientRuntimeException.class);
-    expectedException.expectMessage("Too many open statements, limit is 4");
-    client.createStatement();
+    AvaticaClientRuntimeException ex = Assert.assertThrows(
+        AvaticaClientRuntimeException.class,
+        () -> client.createStatement()
+    );
+    Assert.assertTrue(ex.getMessage().contains("Too many open statements, 
limit is 4"));
   }
 
   @Test
   public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException
   {
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < STATEMENT_LIMIT * 2; i++) {
       client.createStatement().close();
     }
   }
@@ -863,7 +928,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
           ImmutableList.of(ImmutableMap.of("cnt", 6L)),
           getRows(resultSet)
       );
-      druidMeta.closeAllConnections();
+      server.druidMeta.closeAllConnections();
     }
   }
 
@@ -875,70 +940,72 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
     superuserClient.createStatement();
     clientNoTrailingSlash.createStatement();
 
-    expectedException.expect(AvaticaClientRuntimeException.class);
-    expectedException.expectMessage("Too many connections");
-
-    DriverManager.getConnection(url);
+    AvaticaClientRuntimeException ex = Assert.assertThrows(
+        AvaticaClientRuntimeException.class,
+        () -> server.getUserConnection()
+    );
+    Assert.assertTrue(ex.getMessage().contains("Too many connections"));
   }
 
   @Test
-  public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException
+  public void testNotTooManyConnectionsWhenTheyAreClosed() throws SQLException
   {
-    for (int i = 0; i < 4; i++) {
-      try (Connection connection = DriverManager.getConnection(url)) {
+    for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
+      try (Connection connection = server.getUserConnection()) {
       }
     }
   }
 
   @Test
-  public void testMaxRowsPerFrame() throws Exception
+  public void testConnectionsCloseStatements() throws SQLException
   {
-    final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
-    {
-      @Override
-      public int getMaxConnections()
-      {
-        return 2;
+    for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
+      try (Connection connection = server.getUserConnection()) {
+        // Note: NOT in a try-catch block. Let the connection close the 
statement
+        final Statement statement = connection.createStatement();
+
+        // Again, NOT in a try-catch block: let the statement close the
+        // result set.
+        final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS 
cnt FROM druid.foo");
+        Assert.assertTrue(resultSet.next());
       }
+    }
+  }
 
-      @Override
-      public int getMaxStatementsPerConnection()
-      {
-        return 4;
-      }
+  private SqlStatementFactory makeStatementFactory()
+  {
+    return CalciteTests.createSqlStatementFactory(
+        CalciteTests.createMockSqlEngine(walker, conglomerate),
+        new PlannerFactory(
+            makeRootSchema(),
+            operatorTable,
+            macroTable,
+            plannerConfig,
+            AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+            CalciteTests.getJsonMapper(),
+            CalciteTests.DRUID_SCHEMA_NAME,
+            new CalciteRulesManager(ImmutableSet.of())
+        )
+    );
+  }
 
-      @Override
-      public int getMaxRowsPerFrame()
-      {
-        return 2;
-      }
-    };
+  @Test
+  public void testMaxRowsPerFrame() throws Exception
+  {
+    final AvaticaServerConfig config = new AvaticaServerConfig();
+    config.maxConnections = 2;
+    config.maxStatementsPerConnection = STATEMENT_LIMIT;
+    config.maxRowsPerFrame = 2;
 
-    final PlannerConfig plannerConfig = new PlannerConfig();
-    final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
-    final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     final List<Meta.Frame> frames = new ArrayList<>();
     final ScheduledExecutorService exec = 
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
-    DruidSchemaCatalog rootSchema =
-        CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
     DruidMeta smallFrameDruidMeta = new DruidMeta(
-        CalciteTests.createSqlStatementFactory(
-            CalciteTests.createMockSqlEngine(walker, conglomerate),
-            new PlannerFactory(
-                rootSchema,
-                operatorTable,
-                macroTable,
-                plannerConfig,
-                AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-                CalciteTests.getJsonMapper(),
-                CalciteTests.DRUID_SCHEMA_NAME,
-                new CalciteRulesManager(ImmutableSet.of())
-            )
-        ),
-        smallFrameConfig,
+        makeStatementFactory(),
+        config,
         new ErrorHandler(new ServerConfig()),
         exec,
-        injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
+        
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+        new ResultFetcherFactory(config.fetchTimeoutMs)
     )
     {
       @Override
@@ -955,13 +1022,8 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
       }
     };
 
-    final AbstractAvaticaHandler handler = 
this.getAvaticaHandler(smallFrameDruidMeta);
-    final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
-    Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", 
port));
-    smallFrameServer.setHandler(handler);
-    smallFrameServer.start();
-    String smallFrameUrl = this.getJdbcConnectionString(port);
-    Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, 
"regularUser", "druid");
+    ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
+    Connection smallFrameClient = server.getUserConnection();
 
     final ResultSet resultSet = 
smallFrameClient.createStatement().executeQuery(
         "SELECT dim1 FROM druid.foo"
@@ -980,59 +1042,29 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
         rows
     );
 
+    resultSet.close();
+    smallFrameClient.close();
     exec.shutdown();
+    server.close();
   }
 
   @Test
   public void testMinRowsPerFrame() throws Exception
   {
-    final int minFetchSize = 1000;
-    final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
-    {
-      @Override
-      public int getMaxConnections()
-      {
-        return 2;
-      }
-
-      @Override
-      public int getMaxStatementsPerConnection()
-      {
-        return 4;
-      }
+    final AvaticaServerConfig config = new AvaticaServerConfig();
+    config.maxConnections = 2;
+    config.maxStatementsPerConnection = STATEMENT_LIMIT;
+    config.minRowsPerFrame = 1000;
 
-      @Override
-      public int getMinRowsPerFrame()
-      {
-        return minFetchSize;
-      }
-    };
-
-    final PlannerConfig plannerConfig = new PlannerConfig();
-    final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
-    final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     final List<Meta.Frame> frames = new ArrayList<>();
     final ScheduledExecutorService exec = 
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
-    DruidSchemaCatalog rootSchema =
-        CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
     DruidMeta smallFrameDruidMeta = new DruidMeta(
-        CalciteTests.createSqlStatementFactory(
-            CalciteTests.createMockSqlEngine(walker, conglomerate),
-            new PlannerFactory(
-                rootSchema,
-                operatorTable,
-                macroTable,
-                plannerConfig,
-                AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-                CalciteTests.getJsonMapper(),
-                CalciteTests.DRUID_SCHEMA_NAME,
-                new CalciteRulesManager(ImmutableSet.of())
-            )
-        ),
-        smallFrameConfig,
+        makeStatementFactory(),
+        config,
         new ErrorHandler(new ServerConfig()),
         exec,
-        injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
+        
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+        new ResultFetcherFactory(config.fetchTimeoutMs)
     )
     {
       @Override
@@ -1043,20 +1075,15 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
       ) throws NoSuchStatementException, MissingResultsException
       {
         // overriding fetch allows us to track how many frames are processed 
after the first frame, and also fetch size
-        Assert.assertEquals(minFetchSize, fetchMaxRowCount);
+        Assert.assertEquals(config.minRowsPerFrame, fetchMaxRowCount);
         Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
         frames.add(frame);
         return frame;
       }
     };
 
-    final AbstractAvaticaHandler handler = 
this.getAvaticaHandler(smallFrameDruidMeta);
-    final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
-    Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1", 
port));
-    smallFrameServer.setHandler(handler);
-    smallFrameServer.start();
-    String smallFrameUrl = this.getJdbcConnectionString(port);
-    Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl, 
"regularUser", "druid");
+    ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
+    Connection smallFrameClient = server.getUserConnection();
 
     // use a prepared statement because Avatica currently ignores fetchSize on 
the initial fetch of a Statement
     PreparedStatement statement = smallFrameClient.prepareStatement("SELECT 
dim1 FROM druid.foo");
@@ -1078,7 +1105,10 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
         rows
     );
 
+    resultSet.close();
+    smallFrameClient.close();
     exec.shutdown();
+    server.close();
   }
 
   @Test
@@ -1546,24 +1576,124 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
     Assert.fail("Test failed, did not get SQLException");
   }
 
-  // Default implementation is for JSON to allow debugging of tests.
-  protected String getJdbcConnectionString(final int port)
+  private static class TestResultFetcher extends ResultFetcher
   {
-    return StringUtils.format(
-            "jdbc:avatica:remote:url=http://127.0.0.1:%d%s";,
-            port,
-            DruidAvaticaJsonHandler.AVATICA_PATH
-    );
+    public TestResultFetcher(int limit, Yielder<Object[]> yielder)
+    {
+      super(limit, yielder);
+    }
+
+    @Override
+    public Meta.Frame call()
+    {
+      try {
+        if (offset() == 0) {
+          System.out.println("Taking a nap now...");
+          Thread.sleep(3000);
+        }
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return super.call();
+    }
   }
 
-  // Default implementation is for JSON to allow debugging of tests.
-  protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+  /**
+   * Test the async aspect of the Avatica implementation. The fetch of the
+   * first batch takes 3 seconds (due to a sleep). However, the client will
+   * wait only 1 second. So, we should get ~3 empty batches before we get
+   * the first batch with rows.
+   */
+  @Test
+  public void testAsync() throws Exception
   {
-    return new DruidAvaticaJsonHandler(
-            druidMeta,
-            new DruidNode("dummy", "dummy", false, 1, null, true, false),
-            new AvaticaMonitor()
-    );
+    final AvaticaServerConfig config = new AvaticaServerConfig();
+    config.maxConnections = CONNECTION_LIMIT;
+    config.maxStatementsPerConnection = STATEMENT_LIMIT;
+    config.maxRowsPerFrame = 2;
+    config.fetchTimeoutMs = 1000;
+
+    final List<Meta.Frame> frames = new ArrayList<>();
+    final ScheduledExecutorService exec = 
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
+    DruidMeta druidMeta = new DruidMeta(
+        makeStatementFactory(),
+        config,
+        new ErrorHandler(new ServerConfig()),
+        exec,
+        
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+        new ResultFetcherFactory(config.fetchTimeoutMs) {
+          @Override
+          public ResultFetcher newFetcher(
+              final int limit,
+              final Yielder<Object[]> yielder
+          )
+          {
+            return new TestResultFetcher(limit, yielder);
+          }
+        }
+    )
+    {
+      @Override
+      public Frame fetch(
+          final StatementHandle statement,
+          final long offset,
+          final int fetchMaxRowCount
+      ) throws NoSuchStatementException, MissingResultsException
+      {
+        Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
+        frames.add(frame);
+        return frame;
+      }
+    };
+
+    ServerWrapper server = new ServerWrapper(druidMeta);
+    try (Connection conn = server.getUserConnection()) {
+
+      // Test with plain JDBC
+      try (ResultSet resultSet = conn.createStatement().executeQuery(
+          "SELECT dim1 FROM druid.foo")) {
+        List<Map<String, Object>> rows = getRows(resultSet);
+        Assert.assertEquals(6, rows.size());
+        Assert.assertTrue(frames.size() > 3);
+
+        // There should be at least one empty frame due to timeout
+        Assert.assertFalse(frames.get(0).rows.iterator().hasNext());
+      }
+    }
+
+    testWithJDBI(server.url);
+
+    exec.shutdown();
+    server.close();
+  }
+
+  // Test the async feature using DBI, as used internally in Druid.
+  // Ensures that DBI knows how to handle empty batches (which should,
+  // in reality, but handled at the JDBC level below DBI.)
+  private void testWithJDBI(String baseUrl)
+  {
+    String url = baseUrl + "?user=regularUser&password=druid" + 
getJdbcUrlTail();
+    System.out.println(url);
+    DBI dbi = new DBI(url);
+    Handle handle = dbi.open();
+    try {
+      ResultIterator<Pair<Long, String>> iter = handle
+          .createQuery("SELECT __time, dim1 FROM druid.foo")
+          .map((index, row, ctx) -> new Pair<>(row.getLong(1), 
row.getString(2)))
+          .iterator();
+      int count = 0;
+      while (iter.hasNext()) {
+        Pair<Long, String> row = iter.next();
+        Assert.assertNotNull(row.lhs);
+        Assert.assertNotNull(row.rhs);
+        count++;
+      }
+      Assert.assertEquals(6, count);
+    }
+    finally {
+      handle.close();
+    }
   }
 
   private static List<Map<String, Object>> getRows(final ResultSet resultSet) 
throws SQLException
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
index af447f6c13..bbcb0dba92 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
@@ -26,11 +26,10 @@ import org.apache.druid.server.DruidNode;
 public class DruidAvaticaProtobufHandlerTest extends DruidAvaticaHandlerTest
 {
   @Override
-  protected String getJdbcConnectionString(final int port)
+  protected String getJdbcUrlTail()
   {
     return StringUtils.format(
-            
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s;serialization=protobuf";,
-            port,
+            "%s;serialization=protobuf",
             DruidAvaticaProtobufHandler.AVATICA_PATH
     );
   }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index e5dc4a662a..f9b0718d1e 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.server.security.AllowAllAuthenticator;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.sql.SqlQueryPlus;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
 import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -138,7 +139,9 @@ public class DruidStatementTest extends CalciteTestBase
     return new DruidJdbcStatement(
         "",
         0,
-        sqlStatementFactory
+        Collections.emptyMap(),
+        sqlStatementFactory,
+        new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
     );
   }
 
@@ -517,7 +520,8 @@ public class DruidStatementTest extends CalciteTestBase
         "",
         0,
         sqlStatementFactory.preparedStatement(queryPlus),
-        Long.MAX_VALUE
+        Long.MAX_VALUE,
+        new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
     );
   }
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 0370ad72a0..fc0a845ca9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -185,19 +185,19 @@ public class CalciteTests
     public Authorizer getAuthorizer(String name)
     {
       return (authenticationResult, resource, action) -> {
-        if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) {
+        if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) {
           return Access.OK;
         }
 
         switch (resource.getType()) {
           case ResourceType.DATASOURCE:
-            if (resource.getName().equals(FORBIDDEN_DATASOURCE)) {
+            if (FORBIDDEN_DATASOURCE.equals(resource.getName())) {
               return new Access(false);
             } else {
               return Access.OK;
             }
           case ResourceType.VIEW:
-            if (resource.getName().equals("forbiddenView")) {
+            if ("forbiddenView".equals(resource.getName())) {
               return new Access(false);
             } else {
               return Access.OK;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to