This is an automated email from the ASF dual-hosted git repository.
progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b34b4353f4 Async reads for JDBC (#13196)
b34b4353f4 is described below
commit b34b4353f4a27065b37feac97995d4984334f8ed
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Oct 18 11:40:57 2022 -0700
Async reads for JDBC (#13196)
Async reads for JDBC:
Prevents JDBC timeouts on long queries by returning empty batches
when a batch fetch takes too long. Uses an async model to run the
result fetch concurrently with JDBC requests.
Fixed race condition in Druid's Avatica server-side handler
Fixed issue with no-user connections
---
.../druid/java/util/common/concurrent/Execs.java | 1 -
docs/configuration/index.md | 1 +
.../druid/server/AsyncQueryForwardingServlet.java | 2 +-
sql/pom.xml | 6 +-
.../sql/avatica/AbstractDruidJdbcStatement.java | 6 +-
.../druid/sql/avatica/AvaticaServerConfig.java | 17 +
.../druid/sql/avatica/DruidAvaticaJsonHandler.java | 2 +-
.../sql/avatica/DruidAvaticaProtobufHandler.java | 2 +-
.../apache/druid/sql/avatica/DruidConnection.java | 86 ++--
.../sql/avatica/DruidJdbcPreparedStatement.java | 8 +-
.../druid/sql/avatica/DruidJdbcResultSet.java | 237 +++++++++--
.../druid/sql/avatica/DruidJdbcStatement.java | 14 +-
.../org/apache/druid/sql/avatica/DruidMeta.java | 123 +++---
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 466 +++++++++++++--------
.../avatica/DruidAvaticaProtobufHandlerTest.java | 5 +-
.../druid/sql/avatica/DruidStatementTest.java | 8 +-
.../druid/sql/calcite/util/CalciteTests.java | 6 +-
17 files changed, 661 insertions(+), 329 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
index a310e56756..c5bc20f45f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
*/
public class Execs
{
-
/**
* Returns an ExecutorService which is terminated and shutdown from the
beginning and not able to accept any tasks.
*/
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index cb826ef348..6e92076eb7 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1888,6 +1888,7 @@ The Druid SQL server is configured through the following
properties on the Broke
|`druid.sql.avatica.minRowsPerFrame`|Minimum acceptable value for the JDBC
client `Statement.setFetchSize` method. The value for this property must
greater than 0. If the JDBC client calls `Statement.setFetchSize` with a lesser
value, Druid uses `minRowsPerFrame` instead. If `maxRowsPerFrame` is less than
`minRowsPerFrame`, Druid uses the minimum value of the two. For handling
queries which produce results with a large number of rows, you can increase
this value to reduce the number of f [...]
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous
open statements per Avatica client connection.|4|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle
timeout.|PT5M|
+|`druid.sql.avatica.fetchTimeoutMs`|Avatica fetch timeout, in milliseconds.
When a request for the next batch of data takes longer than this time, Druid
returns an empty result set, causing the client to poll again. This avoids HTTP
timeouts for long-running queries. The default of 5 sec. is good for most
cases. |5000|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at
`/druid/v2/sql/`.|true|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN
query](../querying/topnquery.md). Higher limits will be planned as [GroupBy
queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata
refreshes.|PT1M|
diff --git
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index fa3b52669b..75b13a39f1 100644
---
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -402,7 +402,7 @@ public class AsyncQueryForwardingServlet extends
AsyncProxyServlet implements Qu
HttpServletResponse response
) throws ServletException, IOException
{
- // Just call the superclass service method. Overriden in tests.
+ // Just call the superclass service method. Overridden in tests.
super.service(request, response);
}
diff --git a/sql/pom.xml b/sql/pom.xml
index bbf2eb049e..28f7eecffc 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -247,9 +247,13 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>${mockito.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jdbi</groupId>
+ <artifactId>jdbi</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
index 697ad1ca17..1992c1620b 100644
---
a/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/AbstractDruidJdbcStatement.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PrepareResult;
@@ -56,16 +57,19 @@ public abstract class AbstractDruidJdbcStatement implements
Closeable
protected final String connectionId;
protected final int statementId;
+ protected final ResultFetcherFactory fetcherFactory;
protected Throwable throwable;
protected DruidJdbcResultSet resultSet;
public AbstractDruidJdbcStatement(
final String connectionId,
- final int statementId
+ final int statementId,
+ final ResultFetcherFactory fetcherFactory
)
{
this.connectionId = Preconditions.checkNotNull(connectionId,
"connectionId");
this.statementId = statementId;
+ this.fetcherFactory = fetcherFactory;
}
protected static Meta.Signature createSignature(
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
index e931c3a289..a5215c19c0 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/AvaticaServerConfig.java
@@ -30,6 +30,7 @@ class AvaticaServerConfig
public static Period DEFAULT_CONNECTION_IDLE_TIMEOUT = new Period("PT5M");
public static int DEFAULT_MIN_ROWS_PER_FRAME = 100;
public static int DEFAULT_MAX_ROWS_PER_FRAME = 5000;
+ public static int DEFAULT_FETCH_TIMEOUT_MS = 5000;
@JsonProperty
public int maxConnections = DEFAULT_MAX_CONNECTIONS;
@@ -46,6 +47,17 @@ class AvaticaServerConfig
@JsonProperty
public int maxRowsPerFrame = DEFAULT_MAX_ROWS_PER_FRAME;
+ /**
+ * The maximum amount of time to wait per-fetch for the next result set.
+ * If a query takes longer than this amount of time, then the fetch will
+ * return 0 rows, without EOF, and the client will automatically try
+ * another fetch. The result is an async protocol that avoids network
+ * timeouts for long-running queries, especially those that take a long
+ * time to deliver a first batch of results.
+ */
+ @JsonProperty
+ public int fetchTimeoutMs = DEFAULT_FETCH_TIMEOUT_MS;
+
public int getMaxConnections()
{
return maxConnections;
@@ -77,4 +89,9 @@ class AvaticaServerConfig
}
return minRowsPerFrame;
}
+
+ public int getFetchTimeoutMs()
+ {
+ return fetchTimeoutMs;
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
index e2d223d126..4f1a5818bf 100644
---
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaJsonHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaJsonHandler extends AvaticaJsonHandler
{
- public static final String AVATICA_PATH = "/druid/v2/sql/avatica/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH =
"/druid/v2/sql/avatica";
+ public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH +
"/";
@Inject
public DruidAvaticaJsonHandler(
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
index 50c54ad27a..a15efadda6 100644
---
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandler.java
@@ -35,8 +35,8 @@ import java.io.IOException;
public class DruidAvaticaProtobufHandler extends AvaticaProtobufHandler
{
- public static final String AVATICA_PATH = "/druid/v2/sql/avatica-protobuf/";
public static final String AVATICA_PATH_NO_TRAILING_SLASH =
"/druid/v2/sql/avatica-protobuf";
+ public static final String AVATICA_PATH = AVATICA_PATH_NO_TRAILING_SLASH +
"/";
@Inject
public DruidAvaticaProtobufHandler(
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
index 23f1a222dd..5aa3f422b7 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidConnection.java
@@ -24,8 +24,10 @@ import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.sql.PreparedStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import java.util.Collections;
import java.util.Map;
@@ -37,6 +39,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Connection tracking for {@link DruidMeta}. Thread-safe.
+ * <p>
+ * Lock is the instance itself. Used here to protect two members, and in
+ * other code when we must resolve the connection after resolving the
statement.
+ * The lock prevents closing the connection concurrently with an operation on
+ * a statement for that connection.
*/
public class DruidConnection
{
@@ -56,13 +63,12 @@ public class DruidConnection
private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference<Future<?>> timeoutFuture = new
AtomicReference<>();
- // Typically synchronized by connectionLock, except in one case: the onClose
function passed
+ // Typically synchronized by this instance, except in one case: the onClose
function passed
// into DruidStatements contained by the map.
- @GuardedBy("connectionLock")
+ @GuardedBy("this")
private final ConcurrentMap<Integer, AbstractDruidJdbcStatement> statements
= new ConcurrentHashMap<>();
- private final Object connectionLock = new Object();
- @GuardedBy("connectionLock")
+ @GuardedBy("this")
private boolean open = true;
public DruidConnection(
@@ -93,13 +99,14 @@ public class DruidConnection
return userSecret;
}
- public DruidJdbcStatement createStatement(
- final SqlStatementFactory sqlStatementFactory
+ public synchronized DruidJdbcStatement createStatement(
+ final SqlStatementFactory sqlStatementFactory,
+ final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
- synchronized (connectionLock) {
+ synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old
statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't
have billions of statements.
@@ -114,7 +121,9 @@ public class DruidConnection
final DruidJdbcStatement statement = new DruidJdbcStatement(
connectionId,
statementId,
- sqlStatementFactory
+ sessionContext,
+ sqlStatementFactory,
+ fetcherFactory
);
statements.put(statementId, statement);
@@ -123,15 +132,16 @@ public class DruidConnection
}
}
- public DruidJdbcPreparedStatement createPreparedStatement(
+ public synchronized DruidJdbcPreparedStatement createPreparedStatement(
final SqlStatementFactory sqlStatementFactory,
final SqlQueryPlus sqlQueryPlus,
- final long maxRowCount
+ final long maxRowCount,
+ final ResultFetcherFactory fetcherFactory
)
{
final int statementId = statementCounter.incrementAndGet();
- synchronized (connectionLock) {
+ synchronized (this) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old
statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't
have billions of statements.
@@ -143,11 +153,15 @@ public class DruidConnection
}
@SuppressWarnings("GuardedBy")
+ final PreparedStatement statement =
sqlStatementFactory.preparedStatement(
+ sqlQueryPlus.withContext(sessionContext)
+ );
final DruidJdbcPreparedStatement jdbcStmt = new
DruidJdbcPreparedStatement(
connectionId,
statementId,
- sqlStatementFactory.preparedStatement(sqlQueryPlus),
- maxRowCount
+ statement,
+ maxRowCount,
+ fetcherFactory
);
statements.put(statementId, jdbcStmt);
@@ -156,17 +170,15 @@ public class DruidConnection
}
}
- public AbstractDruidJdbcStatement getStatement(final int statementId)
+ public synchronized AbstractDruidJdbcStatement getStatement(final int
statementId)
{
- synchronized (connectionLock) {
- return statements.get(statementId);
- }
+ return statements.get(statementId);
}
public void closeStatement(int statementId)
{
AbstractDruidJdbcStatement stmt;
- synchronized (connectionLock) {
+ synchronized (this) {
stmt = statements.remove(statementId);
}
if (stmt != null) {
@@ -180,34 +192,30 @@ public class DruidConnection
*
* @return true if closed
*/
- public boolean closeIfEmpty()
+ public synchronized boolean closeIfEmpty()
{
- synchronized (connectionLock) {
- if (statements.isEmpty()) {
- close();
- return true;
- } else {
- return false;
- }
+ if (statements.isEmpty()) {
+ close();
+ return true;
+ } else {
+ return false;
}
}
- public void close()
+ public synchronized void close()
{
- synchronized (connectionLock) {
- // Copy statements before iterating because statement.close() modifies
it.
- for (AbstractDruidJdbcStatement statement :
ImmutableList.copyOf(statements.values())) {
- try {
- statement.close();
- }
- catch (Exception e) {
- LOG.warn("Connection [%s] failed to close statement [%s]!",
connectionId, statement.getStatementId());
- }
+ // Copy statements before iterating because statement.close() modifies it.
+ for (AbstractDruidJdbcStatement statement :
ImmutableList.copyOf(statements.values())) {
+ try {
+ statement.close();
+ }
+ catch (Exception e) {
+ LOG.warn("Connection [%s] failed to close statement [%s]!",
connectionId, statement.getStatementId());
}
-
- LOG.debug("Connection [%s] closed.", connectionId);
- open = false;
}
+
+ LOG.debug("Connection [%s] closed.", connectionId);
+ open = false;
}
public DruidConnection sync(final Future<?> newTimeoutFuture)
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
index 3cd608addb..dcd599c542 100644
---
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
+++
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcPreparedStatement.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.PreparedStatement;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import java.util.List;
@@ -50,10 +51,11 @@ public class DruidJdbcPreparedStatement extends
AbstractDruidJdbcStatement
final String connectionId,
final int statementId,
final PreparedStatement stmt,
- final long maxRowCount
+ final long maxRowCount,
+ final ResultFetcherFactory fetcherFactory
)
{
- super(connectionId, statementId);
+ super(connectionId, statementId, fetcherFactory);
this.sqlStatement = stmt;
this.maxRowCount = maxRowCount;
}
@@ -98,7 +100,7 @@ public class DruidJdbcPreparedStatement extends
AbstractDruidJdbcStatement
closeResultSet();
try {
DirectStatement directStmt = sqlStatement.execute(parameters);
- resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount);
+ resultSet = new DruidJdbcResultSet(this, directStmt, maxRowCount,
fetcherFactory);
resultSet.execute();
}
// Failure to execute does not close the prepared statement.
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
index 2b49401552..1eb0d1aa5e 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcResultSet.java
@@ -22,20 +22,27 @@ package org.apache.druid.sql.avatica;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.calcite.avatica.Meta;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.sql.DirectStatement;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Druid's server-side representation of a JDBC result set. At most one
@@ -59,6 +66,105 @@ import java.util.concurrent.ExecutorService;
*/
public class DruidJdbcResultSet implements Closeable
{
+ private static final Logger LOG = new Logger(DruidJdbcResultSet.class);
+
+ /**
+ * Asynchronous result fetcher. JDBC operates via REST, which is subject to
+ * a timeout if a query takes too long to respond. Fortunately, JDBC uses a
+ * batched API, and is perfectly happy to get an empty batch. This class
+ * runs in a separate thread to fetch a batch. If the fetch takes too long,
+ * the JDBC request thread will time out waiting, will return an empty batch
+ * to the client, and will remember the fetch for use in the next fetch
+ * request. The result is that the time it takes to produce results for long
+ * running queries is decoupled from the HTTP timeout.
+ */
+ public static class ResultFetcher implements Callable<Meta.Frame>
+ {
+ private final int limit;
+ private int batchSize;
+ private int offset;
+ private Yielder<Object[]> yielder;
+
+ public ResultFetcher(
+ final int limit,
+ final Yielder<Object[]> yielder
+ )
+ {
+ this.limit = limit;
+ this.yielder = yielder;
+ }
+
+ /**
+ * In an ideal world, the batch size would be a constructor parameter.
But, JDBC,
+ * oddly, allows a different batch size per request. Hence, we set the
size using
+ * this method before each fetch attempt.
+ */
+ public void setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Result is only valid between executions, which turns out to be
+ * the only time it is called.
+ */
+ public int offset()
+ {
+ return offset;
+ }
+
+ /**
+ * Fetch the next batch up to the batch size or EOF. Return
+ * the resulting frame. Exceptions are handled by the executor
+ * framework.
+ */
+ @Override
+ public Meta.Frame call()
+ {
+ Preconditions.checkState(batchSize > 0);
+ int rowCount = 0;
+ final int batchLimit = Math.min(limit - offset, batchSize);
+ final List<Object> rows = new ArrayList<>(batchLimit);
+ while (!yielder.isDone() && rowCount < batchLimit) {
+ rows.add(yielder.get());
+ yielder = yielder.next(null);
+ rowCount++;
+ }
+
+ final Meta.Frame result = new Meta.Frame(offset, yielder.isDone(), rows);
+ offset += rowCount;
+ return result;
+ }
+ }
+
+ /**
+ * Creates the result fetcher and holds config. Rather overkill for
production,
+ * but handy for testing.
+ */
+ public static class ResultFetcherFactory
+ {
+ final int fetchTimeoutMs;
+
+ public ResultFetcherFactory(int fetchTimeoutMs)
+ {
+ // To prevent server hammering, the timeout must be at least 1 second.
+ this.fetchTimeoutMs = Math.max(1000, fetchTimeoutMs);
+ }
+
+ public int fetchTimeoutMs()
+ {
+ return fetchTimeoutMs;
+ }
+
+ public ResultFetcher newFetcher(
+ final int limit,
+ final Yielder<Object[]> yielder
+ )
+ {
+ return new ResultFetcher(limit, yielder);
+ }
+ }
+
/**
* Query metrics can only be used within a single thread. Because results can
* be paginated into multiple JDBC frames (each frame being processed by a
@@ -77,25 +183,46 @@ public class DruidJdbcResultSet implements Closeable
* https://github.com/apache/druid/pull/4288
* https://github.com/apache/druid/pull/4415
*/
- private final ExecutorService yielderOpenCloseExecutor;
+ private final ExecutorService queryExecutor;
private final DirectStatement stmt;
private final long maxRowCount;
+ private final ResultFetcherFactory fetcherFactory;
private State state = State.NEW;
private Meta.Signature signature;
- private Yielder<Object[]> yielder;
- private int offset;
+
+ /**
+ * The fetcher which reads batches of rows. Holds onto the yielder for a
+ * query. Maintains the current read offset.
+ */
+ private ResultFetcher fetcher;
+
+ /**
+ * Future for a fetch that timed out waiting, and should be used again on
+ * the next fetch request.
+ */
+ private Future<Meta.Frame> fetchFuture;
+
+ /**
+ * Cached version of the read offset in case the caller asks for the offset
+ * concurrently with a fetch which may update its own offset. This offset
+ * is that for the last batch that the client fetched: the fetcher itself
+ * may be moving to a new offset.
+ */
+ private int nextFetchOffset;
public DruidJdbcResultSet(
final AbstractDruidJdbcStatement jdbcStatement,
final DirectStatement stmt,
- final long maxRowCount
+ final long maxRowCount,
+ final ResultFetcherFactory fetcherFactory
)
{
this.stmt = stmt;
this.maxRowCount = maxRowCount;
- this.yielderOpenCloseExecutor = Execs.singleThreaded(
+ this.fetcherFactory = fetcherFactory;
+ this.queryExecutor = Execs.singleThreaded(
StringUtils.format(
- "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
+ "JDBCQueryExecutor-connection-%s-statement-%d",
StringUtils.encodeForFormat(jdbcStatement.getConnectionId()),
jdbcStatement.getStatementId()
)
@@ -107,19 +234,22 @@ public class DruidJdbcResultSet implements Closeable
ensure(State.NEW);
try {
state = State.RUNNING;
- final Sequence<Object[]> baseSequence =
yielderOpenCloseExecutor.submit(stmt::execute).get().getResults();
- // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
- final Sequence<Object[]> retSequence =
- maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE
- ? baseSequence.limit((int) maxRowCount)
- : baseSequence;
+ // Execute the first step: plan the query and return a sequence to use
+ // to get values.
+ final Sequence<Object[]> sequence =
queryExecutor.submit(stmt::execute).get().getResults();
- yielder = Yielders.each(retSequence);
+ // Subsequent fetch steps are done via the async "fetcher".
+ fetcher = fetcherFactory.newFetcher(
+ // We can't apply limits greater than Integer.MAX_VALUE, ignore them.
+ maxRowCount >= 0 && maxRowCount <= Integer.MAX_VALUE ? (int)
maxRowCount : Integer.MAX_VALUE,
+ Yielders.each(sequence)
+ );
signature = AbstractDruidJdbcStatement.createSignature(
stmt.prepareResult(),
stmt.query().sql()
);
+ LOG.debug("Opened result set [%s]", stmt.sqlQueryId());
}
catch (ExecutionException e) {
throw closeAndPropagateThrowable(e.getCause());
@@ -143,34 +273,61 @@ public class DruidJdbcResultSet implements Closeable
public synchronized Meta.Frame nextFrame(final long fetchOffset, final int
fetchMaxRowCount)
{
ensure(State.RUNNING, State.DONE);
- Preconditions.checkState(fetchOffset == offset, "fetchOffset [%,d] !=
offset [%,d]", fetchOffset, offset);
+ if (fetchOffset != nextFetchOffset) {
+ throw new IAE(
+ "Druid can only fetch forward. Requested offset of %,d != current
offset %,d",
+ fetchOffset,
+ nextFetchOffset
+ );
+ }
if (state == State.DONE) {
- return new Meta.Frame(fetchOffset, true, Collections.emptyList());
+ LOG.debug("EOF at offset %,d for result set [%s]", fetchOffset,
stmt.sqlQueryId());
+ return new Meta.Frame(fetcher.offset(), true, Collections.emptyList());
}
+ final Future<Meta.Frame> future;
+ if (fetchFuture == null) {
+ // Not waiting on a batch. Request one now.
+ fetcher.setBatchSize(fetchMaxRowCount);
+ future = queryExecutor.submit(fetcher);
+ } else {
+ // Last batch took too long. Continue waiting for it.
+ future = fetchFuture;
+ fetchFuture = null;
+ }
try {
- final List<Object> rows = new ArrayList<>();
- while (!yielder.isDone() && (fetchMaxRowCount < 0 || offset <
fetchOffset + fetchMaxRowCount)) {
- rows.add(yielder.get());
- yielder = yielder.next(null);
- offset++;
- }
-
- if (yielder.isDone()) {
+ Meta.Frame result = future.get(fetcherFactory.fetchTimeoutMs(),
TimeUnit.MILLISECONDS);
+ LOG.debug("Fetched batch at offset %,d for result set [%s]",
fetchOffset, stmt.sqlQueryId());
+ if (result.done) {
state = State.DONE;
}
-
- return new Meta.Frame(fetchOffset, state == State.DONE, rows);
+ nextFetchOffset = fetcher.offset;
+ return result;
}
- catch (Throwable t) {
- throw closeAndPropagateThrowable(t);
+ catch (CancellationException | InterruptedException e) {
+ // Consider this a failure.
+ throw closeAndPropagateThrowable(e);
+ }
+ catch (ExecutionException e) {
+ // Fetch threw an error. Unwrap it.
+ throw closeAndPropagateThrowable(e.getCause());
+ }
+ catch (TimeoutException e) {
+ LOG.debug("Timeout of batch at offset %,d for result set [%s]",
fetchOffset, stmt.sqlQueryId());
+ fetchFuture = future;
+ // Wait timed out. Return 0 rows: the client will try again later.
+ // We'll wait again on this same fetch next time.
+ // Note that when the next fetch request comes, it will use the batch
+ // size set here: any change in size will be ignored for the in-flight
batch.
+ // Changing batch size mid-query is an odd case: it will probably never
happen.
+ return new Meta.Frame(nextFetchOffset, false, Collections.emptyList());
}
}
public synchronized long getCurrentOffset()
{
ensure(State.RUNNING, State.DONE);
- return offset;
+ return fetcher.offset;
}
@GuardedBy("this")
@@ -215,14 +372,28 @@ public class DruidJdbcResultSet implements Closeable
if (state == State.CLOSED || state == State.FAILED) {
return;
}
+ LOG.debug("Closing result set [%s]", stmt.sqlQueryId());
state = State.CLOSED;
try {
- if (yielder != null) {
- Yielder<Object[]> theYielder = this.yielder;
- this.yielder = null;
+ // If a fetch is in progress, wait for it to complete.
+ if (fetchFuture != null) {
+ try {
+ fetchFuture.cancel(true);
+ fetchFuture.get();
+ }
+ catch (Exception e) {
+ // Ignore, we're shutting down anyway.
+ }
+ finally {
+ fetchFuture = null;
+ }
+ }
+ if (fetcher != null) {
+ Yielder<Object[]> theYielder = fetcher.yielder;
+ fetcher = null;
// Put the close last, so any exceptions it throws are after we did
the other cleanup above.
- yielderOpenCloseExecutor.submit(
+ queryExecutor.submit(
() -> {
theYielder.close();
// makes this a Callable instead of Runnable so we don't need to
catch exceptions inside the lambda
@@ -230,7 +401,7 @@ public class DruidJdbcResultSet implements Closeable
}
).get();
- yielderOpenCloseExecutor.shutdownNow();
+ queryExecutor.shutdownNow();
}
}
catch (RuntimeException e) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
index 3b84b7e483..4c342a46fe 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidJdbcStatement.java
@@ -24,6 +24,9 @@ import org.apache.calcite.avatica.Meta;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
+
+import java.util.Map;
/**
* Represents Druid's version of the JDBC {@code Statement} class:
@@ -36,22 +39,27 @@ import org.apache.druid.sql.SqlStatementFactory;
public class DruidJdbcStatement extends AbstractDruidJdbcStatement
{
private final SqlStatementFactory lifecycleFactory;
+ protected final Map<String, Object> queryContext;
public DruidJdbcStatement(
final String connectionId,
final int statementId,
- final SqlStatementFactory lifecycleFactory
+ final Map<String, Object> queryContext,
+ final SqlStatementFactory lifecycleFactory,
+ final ResultFetcherFactory fetcherFactory
)
{
- super(connectionId, statementId);
+ super(connectionId, statementId, fetcherFactory);
+ this.queryContext = queryContext;
this.lifecycleFactory = Preconditions.checkNotNull(lifecycleFactory,
"lifecycleFactory");
}
public synchronized void execute(SqlQueryPlus queryPlus, long maxRowCount)
{
closeResultSet();
+ queryPlus = queryPlus.withContext(queryContext);
DirectStatement stmt = lifecycleFactory.directStatement(queryPlus);
- resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE);
+ resultSet = new DruidJdbcResultSet(this, stmt, Long.MAX_VALUE,
fetcherFactory);
try {
resultSet.execute();
}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
index 6ee3de811c..76efecf68b 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidMeta.java
@@ -49,6 +49,7 @@ import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.joda.time.Interval;
@@ -95,7 +96,20 @@ public class DruidMeta extends MetaImpl
*/
public static <T extends Throwable> T logFailure(T error)
{
- logFailure(error, error.getMessage());
+ if (error instanceof NoSuchConnectionException) {
+ NoSuchConnectionException ex = (NoSuchConnectionException) error;
+ logFailure(error, "No such connection: %s", ex.getConnectionId());
+ } else if (error instanceof NoSuchStatementException) {
+ NoSuchStatementException ex = (NoSuchStatementException) error;
+ logFailure(
+ error,
+ "No such statement: %s, %d",
+ ex.getStatementHandle().connectionId,
+ ex.getStatementHandle().id
+ );
+ } else {
+ logFailure(error, error.getMessage());
+ }
return error;
}
@@ -115,6 +129,7 @@ public class DruidMeta extends MetaImpl
private final AvaticaServerConfig config;
private final List<Authenticator> authenticators;
private final ErrorHandler errorHandler;
+ private final ResultFetcherFactory fetcherFactory;
/**
* Tracks logical connections.
@@ -145,7 +160,8 @@ public class DruidMeta extends MetaImpl
.setDaemon(true)
.build()
),
- authMapper.getAuthenticatorChain()
+ authMapper.getAuthenticatorChain(),
+ new ResultFetcherFactory(config.getFetchTimeoutMs())
);
}
@@ -154,7 +170,8 @@ public class DruidMeta extends MetaImpl
final AvaticaServerConfig config,
final ErrorHandler errorHandler,
final ScheduledExecutorService exec,
- final List<Authenticator> authenticators
+ final List<Authenticator> authenticators,
+ final ResultFetcherFactory fetcherFactory
)
{
super(null);
@@ -163,6 +180,7 @@ public class DruidMeta extends MetaImpl
this.errorHandler = errorHandler;
this.exec = exec;
this.authenticators = authenticators;
+ this.fetcherFactory = fetcherFactory;
}
@Override
@@ -188,11 +206,6 @@ public class DruidMeta extends MetaImpl
try {
openDruidConnection(ch.id, secret, contextMap);
}
- catch (NoSuchConnectionException e) {
- // Avoid sanitizing Avatica specific exceptions so that the Avatica code
- // can rely on them to handle issues in a JDBC-specific way.
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -208,9 +221,6 @@ public class DruidMeta extends MetaImpl
druidConnection.close();
}
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -224,9 +234,6 @@ public class DruidMeta extends MetaImpl
getDruidConnection(ch.id);
return connProps;
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -241,12 +248,10 @@ public class DruidMeta extends MetaImpl
public StatementHandle createStatement(final ConnectionHandle ch)
{
try {
- final DruidJdbcStatement druidStatement =
getDruidConnection(ch.id).createStatement(sqlStatementFactory);
+ final DruidJdbcStatement druidStatement = getDruidConnection(ch.id)
+ .createStatement(sqlStatementFactory, fetcherFactory);
return new StatementHandle(ch.id, druidStatement.getStatementId(), null);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -275,15 +280,13 @@ public class DruidMeta extends MetaImpl
final DruidJdbcPreparedStatement stmt =
getDruidConnection(ch.id).createPreparedStatement(
sqlStatementFactory,
sqlReq,
- maxRowCount
+ maxRowCount,
+ fetcherFactory
);
stmt.prepare();
LOG.debug("Successfully prepared statement [%s] for execution",
stmt.getStatementId());
return new StatementHandle(ch.id, stmt.getStatementId(),
stmt.getSignature());
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -315,7 +318,7 @@ public class DruidMeta extends MetaImpl
}
/**
- * Prepares and executes a JDBC {@code Statement}
+ * Prepares and executes a JDBC {@code Statement}.
*/
@Override
public ExecuteResult prepareAndExecute(
@@ -324,26 +327,25 @@ public class DruidMeta extends MetaImpl
final long maxRowCount,
final int maxRowsInFirstFrame,
final PrepareCallback callback
- ) throws NoSuchStatementException
+ )
{
-
try {
// Ignore "callback", this class is designed for use with LocalService
which doesn't use it.
final DruidJdbcStatement druidStatement = getDruidStatement(statement,
DruidJdbcStatement.class);
final DruidConnection druidConnection =
getDruidConnection(statement.connectionId);
- AuthenticationResult authenticationResult =
doAuthenticate(druidConnection);
- SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
- .auth(authenticationResult)
- .context(druidConnection.sessionContext())
- .build();
- druidStatement.execute(sqlRequest, maxRowCount);
- ExecuteResult result = doFetch(druidStatement, maxRowsInFirstFrame);
- LOG.debug("Successfully prepared statement [%s] and started execution",
druidStatement.getStatementId());
- return result;
- }
- // Cannot affect these exceptions as Avatica handles them.
- catch (NoSuchConnectionException | NoSuchStatementException e) {
- throw e;
+
+ // This method is called directly from the Avatica server: it does not go
+ // through the connection first. We must lock the connection here to
prevent race conditions.
+ synchronized (druidConnection) {
+ final AuthenticationResult authenticationResult =
doAuthenticate(druidConnection);
+ final SqlQueryPlus sqlRequest = SqlQueryPlus.builder(sql)
+ .auth(authenticationResult)
+ .build();
+ druidStatement.execute(sqlRequest, maxRowCount);
+ final ExecuteResult result = doFetch(druidStatement,
maxRowsInFirstFrame);
+ LOG.debug("Successfully prepared statement [%s] and started
execution", druidStatement.getStatementId());
+ return result;
+ }
}
catch (Throwable t) {
throw mapException(t);
@@ -357,6 +359,15 @@ public class DruidMeta extends MetaImpl
*/
private RuntimeException mapException(Throwable t)
{
+ // Don't sanitize or wrap Avatica exceptions: these exceptions
+ // are handled specially by Avatica to provide SQLState, Error Code
+ // and other JDBC-specific items.
+ if (t instanceof AvaticaRuntimeException) {
+ throw (AvaticaRuntimeException) t;
+ }
+ if (t instanceof NoSuchConnectionException) {
+ throw (NoSuchConnectionException) t;
+ }
// BasicSecurityAuthenticationException is not visible here.
String className = t.getClass().getSimpleName();
if (t instanceof ForbiddenException ||
@@ -365,7 +376,8 @@ public class DruidMeta extends MetaImpl
t.getMessage(),
ErrorResponse.UNAUTHORIZED_ERROR_CODE,
ErrorResponse.UNAUTHORIZED_SQL_STATE,
- AvaticaSeverity.ERROR);
+ AvaticaSeverity.ERROR
+ );
}
// Let Avatica do its default mapping.
@@ -425,9 +437,6 @@ public class DruidMeta extends MetaImpl
LOG.debug("Fetching next frame from offset %,d with %,d rows for
statement [%s]", offset, maxRows, statement.id);
return getDruidStatement(statement,
AbstractDruidJdbcStatement.class).nextFrame(offset, maxRows);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -450,7 +459,7 @@ public class DruidMeta extends MetaImpl
final StatementHandle statement,
final List<TypedValue> parameterValues,
final int maxRowsInFirstFrame
- ) throws NoSuchStatementException
+ )
{
try {
final DruidJdbcPreparedStatement druidStatement =
@@ -462,9 +471,6 @@ public class DruidMeta extends MetaImpl
druidStatement.getStatementId());
return result;
}
- catch (NoSuchStatementException | NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -493,9 +499,6 @@ public class DruidMeta extends MetaImpl
druidConnection.closeStatement(h.id);
}
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -506,7 +509,7 @@ public class DruidMeta extends MetaImpl
final StatementHandle sh,
final QueryState state,
final long offset
- ) throws NoSuchStatementException
+ )
{
try {
final AbstractDruidJdbcStatement druidStatement = getDruidStatement(sh,
AbstractDruidJdbcStatement.class);
@@ -521,9 +524,6 @@ public class DruidMeta extends MetaImpl
}
return !isDone;
}
- catch (NoSuchStatementException | NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -560,9 +560,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -597,9 +594,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -656,9 +650,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -726,9 +717,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -747,9 +735,6 @@ public class DruidMeta extends MetaImpl
return sqlResultSet(ch, sql);
}
- catch (NoSuchConnectionException e) {
- throw e;
- }
catch (Throwable t) {
throw mapException(t);
}
@@ -851,7 +836,7 @@ public class DruidMeta extends MetaImpl
return connection.sync(
exec.schedule(
() -> {
- LOG.debug("Connection[%s] timed out.", connectionId);
+ LOG.debug("Connection [%s] timed out.", connectionId);
closeConnection(new ConnectionHandle(connectionId));
},
new Interval(DateTimes.nowUtc(),
config.getConnectionIdleTimeout()).toDurationMillis(),
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 119d10ae4e..9cf65fe196 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
@@ -67,6 +68,9 @@ import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcher;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@@ -90,11 +94,12 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.ResultIterator;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -117,30 +122,27 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
/**
* Tests the Avatica-based JDBC implementation using JSON serialization. See
* {@link DruidAvaticaProtobufHandlerTest} for a subclass which runs
* this same set of tests using Protobuf serialization.
+ * To run this in an IDE, set {@code -Duser.timezone=UTC}.
*/
public class DruidAvaticaHandlerTest extends CalciteTestBase
{
- private static final AvaticaServerConfig AVATICA_CONFIG = new
AvaticaServerConfig()
- {
- @Override
- public int getMaxConnections()
- {
- // This must match the number of Connection objects created in
testTooManyStatements()
- return 4;
- }
+ private static final int CONNECTION_LIMIT = 4;
+ private static final int STATEMENT_LIMIT = 4;
+
+ private static final AvaticaServerConfig AVATICA_CONFIG;
+
+ static {
+ AVATICA_CONFIG = new AvaticaServerConfig();
+ // This must match the number of Connection objects created in
testTooManyStatements()
+ AVATICA_CONFIG.maxConnections = CONNECTION_LIMIT;
+ AVATICA_CONFIG.maxStatementsPerConnection = STATEMENT_LIMIT;
+ }
- @Override
- public int getMaxStatementsPerConnection()
- {
- return 4;
- }
- };
private static final String DUMMY_SQL_QUERY_ID = "dummy";
private static QueryRunnerFactoryConglomerate conglomerate;
@@ -162,35 +164,99 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
resourceCloser.close();
}
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
+ private final PlannerConfig plannerConfig = new PlannerConfig();
+ private final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
+ private final ExprMacroTable macroTable =
CalciteTests.createExprMacroTable();
private SpecificSegmentsQuerySegmentWalker walker;
- private Server server;
+ private ServerWrapper server;
private Connection client;
private Connection clientNoTrailingSlash;
private Connection superuserClient;
private Connection clientLosAngeles;
- private DruidMeta druidMeta;
- private String url;
private Injector injector;
private TestRequestLogger testRequestLogger;
+ private DruidSchemaCatalog makeRootSchema()
+ {
+ return CalciteTests.createMockRootSchema(
+ conglomerate,
+ walker,
+ plannerConfig,
+ CalciteTests.TEST_AUTHORIZER_MAPPER
+ );
+ }
+
+ private class ServerWrapper
+ {
+ final DruidMeta druidMeta;
+ final Server server;
+ final String url;
+
+ ServerWrapper(final DruidMeta druidMeta) throws Exception
+ {
+ this.druidMeta = druidMeta;
+ server = new Server(0);
+ server.setHandler(getAvaticaHandler(druidMeta));
+ server.start();
+ url = StringUtils.format(
+ "jdbc:avatica:remote:url=%s%s",
+ server.getURI().toString(),
+ StringUtils.maybeRemoveLeadingSlash(getJdbcUrlTail())
+ );
+ }
+
+ public Connection getConnection(String user, String password) throws
SQLException
+ {
+ return DriverManager.getConnection(url, user, password);
+ }
+
+ public Connection getUserConnection() throws SQLException
+ {
+ return getConnection("regularUser", "druid");
+ }
+
+ // Note: though the URL-only form is OK in general, but it will cause tests
+ // to crash as the mock auth test code needs the user name.
+ // Use getUserConnection() instead, or create a URL that includes the
+ // user name and password.
+ //public Connection getConnection() throws SQLException
+ //{
+ // return DriverManager.getConnection(url);
+ //}
+
+ public void close() throws Exception
+ {
+ druidMeta.closeAllConnections();
+ server.stop();
+ }
+ }
+
+ protected String getJdbcUrlTail()
+ {
+ return DruidAvaticaJsonHandler.AVATICA_PATH;
+ }
+
+ // Default implementation is for JSON to allow debugging of tests.
+ protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+ {
+ return new DruidAvaticaJsonHandler(
+ druidMeta,
+ new DruidNode("dummy", "dummy", false, 1, null, true, false),
+ new AvaticaMonitor()
+ );
+ }
+
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate,
temporaryFolder.newFolder());
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
- final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
- final DruidSchemaCatalog rootSchema =
- CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig,
CalciteTests.TEST_AUTHORIZER_MAPPER);
+ final DruidSchemaCatalog rootSchema = makeRootSchema();
testRequestLogger = new TestRequestLogger();
injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
@@ -227,37 +293,34 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
)
.build();
- druidMeta = injector.getInstance(DruidMeta.class);
- final AbstractAvaticaHandler handler = this.getAvaticaHandler(druidMeta);
- final int port = ThreadLocalRandom.current().nextInt(9999) + 10000;
- server = new Server(new InetSocketAddress("127.0.0.1", port));
- server.setHandler(handler);
- server.start();
- url = this.getJdbcConnectionString(port);
- client = DriverManager.getConnection(url, "regularUser", "druid");
- superuserClient = DriverManager.getConnection(url,
CalciteTests.TEST_SUPERUSER_NAME, "druid");
- clientNoTrailingSlash =
DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(url),
CalciteTests.TEST_SUPERUSER_NAME, "druid");
+ DruidMeta druidMeta = injector.getInstance(DruidMeta.class);
+ server = new ServerWrapper(druidMeta);
+ client = server.getUserConnection();
+ superuserClient = server.getConnection(CalciteTests.TEST_SUPERUSER_NAME,
"druid");
+ clientNoTrailingSlash =
DriverManager.getConnection(StringUtils.maybeRemoveTrailingSlash(server.url),
CalciteTests.TEST_SUPERUSER_NAME, "druid");
final Properties propertiesLosAngeles = new Properties();
propertiesLosAngeles.setProperty("sqlTimeZone", "America/Los_Angeles");
propertiesLosAngeles.setProperty("user", "regularUserLA");
propertiesLosAngeles.setProperty(BaseQuery.SQL_QUERY_ID,
DUMMY_SQL_QUERY_ID);
- clientLosAngeles = DriverManager.getConnection(url, propertiesLosAngeles);
+ clientLosAngeles = DriverManager.getConnection(server.url,
propertiesLosAngeles);
}
@After
public void tearDown() throws Exception
{
- client.close();
- clientLosAngeles.close();
- clientNoTrailingSlash.close();
- server.stop();
+ if (server != null) {
+ client.close();
+ clientLosAngeles.close();
+ clientNoTrailingSlash.close();
+ server.close();
+ client = null;
+ clientLosAngeles = null;
+ clientNoTrailingSlash = null;
+ server = null;
+ }
walker.close();
walker = null;
- client = null;
- clientLosAngeles = null;
- clientNoTrailingSlash = null;
- server = null;
}
@Test
@@ -772,19 +835,21 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
@Test
public void testTooManyStatements() throws SQLException
{
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < STATEMENT_LIMIT; i++) {
client.createStatement();
}
- expectedException.expect(AvaticaClientRuntimeException.class);
- expectedException.expectMessage("Too many open statements, limit is 4");
- client.createStatement();
+ AvaticaClientRuntimeException ex = Assert.assertThrows(
+ AvaticaClientRuntimeException.class,
+ () -> client.createStatement()
+ );
+ Assert.assertTrue(ex.getMessage().contains("Too many open statements,
limit is 4"));
}
@Test
public void testNotTooManyStatementsWhenYouCloseThem() throws SQLException
{
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < STATEMENT_LIMIT * 2; i++) {
client.createStatement().close();
}
}
@@ -863,7 +928,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
ImmutableList.of(ImmutableMap.of("cnt", 6L)),
getRows(resultSet)
);
- druidMeta.closeAllConnections();
+ server.druidMeta.closeAllConnections();
}
}
@@ -875,70 +940,72 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
superuserClient.createStatement();
clientNoTrailingSlash.createStatement();
- expectedException.expect(AvaticaClientRuntimeException.class);
- expectedException.expectMessage("Too many connections");
-
- DriverManager.getConnection(url);
+ AvaticaClientRuntimeException ex = Assert.assertThrows(
+ AvaticaClientRuntimeException.class,
+ () -> server.getUserConnection()
+ );
+ Assert.assertTrue(ex.getMessage().contains("Too many connections"));
}
@Test
- public void testNotTooManyConnectionsWhenTheyAreEmpty() throws SQLException
+ public void testNotTooManyConnectionsWhenTheyAreClosed() throws SQLException
{
- for (int i = 0; i < 4; i++) {
- try (Connection connection = DriverManager.getConnection(url)) {
+ for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
+ try (Connection connection = server.getUserConnection()) {
}
}
}
@Test
- public void testMaxRowsPerFrame() throws Exception
+ public void testConnectionsCloseStatements() throws SQLException
{
- final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
- {
- @Override
- public int getMaxConnections()
- {
- return 2;
+ for (int i = 0; i < CONNECTION_LIMIT * 2; i++) {
+ try (Connection connection = server.getUserConnection()) {
+ // Note: NOT in a try-catch block. Let the connection close the
statement
+ final Statement statement = connection.createStatement();
+
+ // Again, NOT in a try-catch block: let the statement close the
+ // result set.
+ final ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) AS
cnt FROM druid.foo");
+ Assert.assertTrue(resultSet.next());
}
+ }
+ }
- @Override
- public int getMaxStatementsPerConnection()
- {
- return 4;
- }
+ private SqlStatementFactory makeStatementFactory()
+ {
+ return CalciteTests.createSqlStatementFactory(
+ CalciteTests.createMockSqlEngine(walker, conglomerate),
+ new PlannerFactory(
+ makeRootSchema(),
+ operatorTable,
+ macroTable,
+ plannerConfig,
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ CalciteTests.getJsonMapper(),
+ CalciteTests.DRUID_SCHEMA_NAME,
+ new CalciteRulesManager(ImmutableSet.of())
+ )
+ );
+ }
- @Override
- public int getMaxRowsPerFrame()
- {
- return 2;
- }
- };
+ @Test
+ public void testMaxRowsPerFrame() throws Exception
+ {
+ final AvaticaServerConfig config = new AvaticaServerConfig();
+ config.maxConnections = 2;
+ config.maxStatementsPerConnection = STATEMENT_LIMIT;
+ config.maxRowsPerFrame = 2;
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
- final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
- DruidSchemaCatalog rootSchema =
- CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
- CalciteTests.createSqlStatementFactory(
- CalciteTests.createMockSqlEngine(walker, conglomerate),
- new PlannerFactory(
- rootSchema,
- operatorTable,
- macroTable,
- plannerConfig,
- AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- CalciteTests.getJsonMapper(),
- CalciteTests.DRUID_SCHEMA_NAME,
- new CalciteRulesManager(ImmutableSet.of())
- )
- ),
- smallFrameConfig,
+ makeStatementFactory(),
+ config,
new ErrorHandler(new ServerConfig()),
exec,
- injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
+
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+ new ResultFetcherFactory(config.fetchTimeoutMs)
)
{
@Override
@@ -955,13 +1022,8 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
}
};
- final AbstractAvaticaHandler handler =
this.getAvaticaHandler(smallFrameDruidMeta);
- final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
- Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1",
port));
- smallFrameServer.setHandler(handler);
- smallFrameServer.start();
- String smallFrameUrl = this.getJdbcConnectionString(port);
- Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl,
"regularUser", "druid");
+ ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
+ Connection smallFrameClient = server.getUserConnection();
final ResultSet resultSet =
smallFrameClient.createStatement().executeQuery(
"SELECT dim1 FROM druid.foo"
@@ -980,59 +1042,29 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
rows
);
+ resultSet.close();
+ smallFrameClient.close();
exec.shutdown();
+ server.close();
}
@Test
public void testMinRowsPerFrame() throws Exception
{
- final int minFetchSize = 1000;
- final AvaticaServerConfig smallFrameConfig = new AvaticaServerConfig()
- {
- @Override
- public int getMaxConnections()
- {
- return 2;
- }
-
- @Override
- public int getMaxStatementsPerConnection()
- {
- return 4;
- }
+ final AvaticaServerConfig config = new AvaticaServerConfig();
+ config.maxConnections = 2;
+ config.maxStatementsPerConnection = STATEMENT_LIMIT;
+ config.minRowsPerFrame = 1000;
- @Override
- public int getMinRowsPerFrame()
- {
- return minFetchSize;
- }
- };
-
- final PlannerConfig plannerConfig = new PlannerConfig();
- final DruidOperatorTable operatorTable =
CalciteTests.createOperatorTable();
- final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
final List<Meta.Frame> frames = new ArrayList<>();
final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
- DruidSchemaCatalog rootSchema =
- CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
DruidMeta smallFrameDruidMeta = new DruidMeta(
- CalciteTests.createSqlStatementFactory(
- CalciteTests.createMockSqlEngine(walker, conglomerate),
- new PlannerFactory(
- rootSchema,
- operatorTable,
- macroTable,
- plannerConfig,
- AuthTestUtils.TEST_AUTHORIZER_MAPPER,
- CalciteTests.getJsonMapper(),
- CalciteTests.DRUID_SCHEMA_NAME,
- new CalciteRulesManager(ImmutableSet.of())
- )
- ),
- smallFrameConfig,
+ makeStatementFactory(),
+ config,
new ErrorHandler(new ServerConfig()),
exec,
- injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain()
+
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+ new ResultFetcherFactory(config.fetchTimeoutMs)
)
{
@Override
@@ -1043,20 +1075,15 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
) throws NoSuchStatementException, MissingResultsException
{
// overriding fetch allows us to track how many frames are processed
after the first frame, and also fetch size
- Assert.assertEquals(minFetchSize, fetchMaxRowCount);
+ Assert.assertEquals(config.minRowsPerFrame, fetchMaxRowCount);
Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
frames.add(frame);
return frame;
}
};
- final AbstractAvaticaHandler handler =
this.getAvaticaHandler(smallFrameDruidMeta);
- final int port = ThreadLocalRandom.current().nextInt(9999) + 20000;
- Server smallFrameServer = new Server(new InetSocketAddress("127.0.0.1",
port));
- smallFrameServer.setHandler(handler);
- smallFrameServer.start();
- String smallFrameUrl = this.getJdbcConnectionString(port);
- Connection smallFrameClient = DriverManager.getConnection(smallFrameUrl,
"regularUser", "druid");
+ ServerWrapper server = new ServerWrapper(smallFrameDruidMeta);
+ Connection smallFrameClient = server.getUserConnection();
// use a prepared statement because Avatica currently ignores fetchSize on
the initial fetch of a Statement
PreparedStatement statement = smallFrameClient.prepareStatement("SELECT
dim1 FROM druid.foo");
@@ -1078,7 +1105,10 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
rows
);
+ resultSet.close();
+ smallFrameClient.close();
exec.shutdown();
+ server.close();
}
@Test
@@ -1546,24 +1576,124 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
Assert.fail("Test failed, did not get SQLException");
}
- // Default implementation is for JSON to allow debugging of tests.
- protected String getJdbcConnectionString(final int port)
+ private static class TestResultFetcher extends ResultFetcher
{
- return StringUtils.format(
- "jdbc:avatica:remote:url=http://127.0.0.1:%d%s",
- port,
- DruidAvaticaJsonHandler.AVATICA_PATH
- );
+ public TestResultFetcher(int limit, Yielder<Object[]> yielder)
+ {
+ super(limit, yielder);
+ }
+
+ @Override
+ public Meta.Frame call()
+ {
+ try {
+ if (offset() == 0) {
+ System.out.println("Taking a nap now...");
+ Thread.sleep(3000);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return super.call();
+ }
}
- // Default implementation is for JSON to allow debugging of tests.
- protected AbstractAvaticaHandler getAvaticaHandler(final DruidMeta druidMeta)
+ /**
+ * Test the async aspect of the Avatica implementation. The fetch of the
+ * first batch takes 3 seconds (due to a sleep). However, the client will
+ * wait only 1 second. So, we should get ~3 empty batches before we get
+ * the first batch with rows.
+ */
+ @Test
+ public void testAsync() throws Exception
{
- return new DruidAvaticaJsonHandler(
- druidMeta,
- new DruidNode("dummy", "dummy", false, 1, null, true, false),
- new AvaticaMonitor()
- );
+ final AvaticaServerConfig config = new AvaticaServerConfig();
+ config.maxConnections = CONNECTION_LIMIT;
+ config.maxStatementsPerConnection = STATEMENT_LIMIT;
+ config.maxRowsPerFrame = 2;
+ config.fetchTimeoutMs = 1000;
+
+ final List<Meta.Frame> frames = new ArrayList<>();
+ final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testMaxRowsPerFrame");
+ DruidMeta druidMeta = new DruidMeta(
+ makeStatementFactory(),
+ config,
+ new ErrorHandler(new ServerConfig()),
+ exec,
+
injector.getInstance(AuthenticatorMapper.class).getAuthenticatorChain(),
+ new ResultFetcherFactory(config.fetchTimeoutMs) {
+ @Override
+ public ResultFetcher newFetcher(
+ final int limit,
+ final Yielder<Object[]> yielder
+ )
+ {
+ return new TestResultFetcher(limit, yielder);
+ }
+ }
+ )
+ {
+ @Override
+ public Frame fetch(
+ final StatementHandle statement,
+ final long offset,
+ final int fetchMaxRowCount
+ ) throws NoSuchStatementException, MissingResultsException
+ {
+ Frame frame = super.fetch(statement, offset, fetchMaxRowCount);
+ frames.add(frame);
+ return frame;
+ }
+ };
+
+ ServerWrapper server = new ServerWrapper(druidMeta);
+ try (Connection conn = server.getUserConnection()) {
+
+ // Test with plain JDBC
+ try (ResultSet resultSet = conn.createStatement().executeQuery(
+ "SELECT dim1 FROM druid.foo")) {
+ List<Map<String, Object>> rows = getRows(resultSet);
+ Assert.assertEquals(6, rows.size());
+ Assert.assertTrue(frames.size() > 3);
+
+ // There should be at least one empty frame due to timeout
+ Assert.assertFalse(frames.get(0).rows.iterator().hasNext());
+ }
+ }
+
+ testWithJDBI(server.url);
+
+ exec.shutdown();
+ server.close();
+ }
+
+ // Test the async feature using DBI, as used internally in Druid.
+ // Ensures that DBI knows how to handle empty batches (which should,
+ // in reality, but handled at the JDBC level below DBI.)
+ private void testWithJDBI(String baseUrl)
+ {
+ String url = baseUrl + "?user=regularUser&password=druid" +
getJdbcUrlTail();
+ System.out.println(url);
+ DBI dbi = new DBI(url);
+ Handle handle = dbi.open();
+ try {
+ ResultIterator<Pair<Long, String>> iter = handle
+ .createQuery("SELECT __time, dim1 FROM druid.foo")
+ .map((index, row, ctx) -> new Pair<>(row.getLong(1),
row.getString(2)))
+ .iterator();
+ int count = 0;
+ while (iter.hasNext()) {
+ Pair<Long, String> row = iter.next();
+ Assert.assertNotNull(row.lhs);
+ Assert.assertNotNull(row.rhs);
+ count++;
+ }
+ Assert.assertEquals(6, count);
+ }
+ finally {
+ handle.close();
+ }
}
private static List<Map<String, Object>> getRows(final ResultSet resultSet)
throws SQLException
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
index af447f6c13..bbcb0dba92 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaProtobufHandlerTest.java
@@ -26,11 +26,10 @@ import org.apache.druid.server.DruidNode;
public class DruidAvaticaProtobufHandlerTest extends DruidAvaticaHandlerTest
{
@Override
- protected String getJdbcConnectionString(final int port)
+ protected String getJdbcUrlTail()
{
return StringUtils.format(
-
"jdbc:avatica:remote:url=http://127.0.0.1:%d%s;serialization=protobuf",
- port,
+ "%s;serialization=protobuf",
DruidAvaticaProtobufHandler.AVATICA_PATH
);
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index e5dc4a662a..f9b0718d1e 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.avatica.DruidJdbcResultSet.ResultFetcherFactory;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -138,7 +139,9 @@ public class DruidStatementTest extends CalciteTestBase
return new DruidJdbcStatement(
"",
0,
- sqlStatementFactory
+ Collections.emptyMap(),
+ sqlStatementFactory,
+ new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
);
}
@@ -517,7 +520,8 @@ public class DruidStatementTest extends CalciteTestBase
"",
0,
sqlStatementFactory.preparedStatement(queryPlus),
- Long.MAX_VALUE
+ Long.MAX_VALUE,
+ new ResultFetcherFactory(AvaticaServerConfig.DEFAULT_FETCH_TIMEOUT_MS)
);
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 0370ad72a0..fc0a845ca9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -185,19 +185,19 @@ public class CalciteTests
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
- if (authenticationResult.getIdentity().equals(TEST_SUPERUSER_NAME)) {
+ if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) {
return Access.OK;
}
switch (resource.getType()) {
case ResourceType.DATASOURCE:
- if (resource.getName().equals(FORBIDDEN_DATASOURCE)) {
+ if (FORBIDDEN_DATASOURCE.equals(resource.getName())) {
return new Access(false);
} else {
return Access.OK;
}
case ResourceType.VIEW:
- if (resource.getName().equals("forbiddenView")) {
+ if ("forbiddenView".equals(resource.getName())) {
return new Access(false);
} else {
return Access.OK;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]