Repository: ignite Updated Branches: refs/heads/ignite-1786 fe63a9086 -> 6a7cb0a9c
IGNITE-2643: Fixed. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a7cb0a9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a7cb0a9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a7cb0a9 Branch: refs/heads/ignite-1786 Commit: 6a7cb0a9c242708c0e267465e812a0ca90bd8459 Parents: fe63a90 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Feb 18 16:54:45 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Feb 18 16:54:45 2016 +0300 ---------------------------------------------------------------------- .../ignite/configuration/OdbcConfiguration.java | 27 +++++++ .../processors/odbc/OdbcNioListener.java | 76 +++++--------------- .../internal/processors/odbc/OdbcProcessor.java | 2 +- .../processors/odbc/OdbcRequestHandler.java | 69 ++++++++++++------ 4 files changed, 95 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java index d6f0500..8f0a0fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java @@ -38,6 +38,9 @@ public class OdbcConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default max number of open cursors per connection. */ + public static final int DFLT_MAX_OPEN_CURSORS = 128; + /** TCP port. */ private int port = DFLT_TCP_PORT; @@ -65,6 +68,9 @@ public class OdbcConfiguration { /** Idle timeout. */ private long idleTimeout = DFLT_IDLE_TIMEOUT; + /** Max number of opened cursors per connection. */ + private int maxOpenCursors = DFLT_MAX_OPEN_CURSORS; + /** * Creates ODBC server configuration with all default values. */ @@ -84,6 +90,7 @@ public class OdbcConfiguration { directBuf = cfg.isDirectBuffer(); host = cfg.getHost(); idleTimeout = cfg.getIdleTimeout(); + maxOpenCursors = cfg.getMaxOpenCursors(); noDelay = cfg.isNoDelay(); port = cfg.getPort(); rcvBufSize = cfg.getReceiveBufferSize(); @@ -289,4 +296,24 @@ public class OdbcConfiguration { public void setIdleTimeout(long idleTimeout) { this.idleTimeout = idleTimeout; } + + /** + * Gets maximum number of opened cursors per connection. + * <p> + * Defaults to {@link #DFLT_MAX_OPEN_CURSORS}. + * + * @return Maximum number of opened cursors. + */ + public int getMaxOpenCursors() { + return maxOpenCursors; + } + + /** + * Sets maximum number of opened cursors per connection. See {@link #getMaxOpenCursors()}. + * + * @param maxOpenCursors Maximum number of opened cursors. + */ + public void setMaxOpenCursors(int maxOpenCursors) { + this.maxOpenCursors = maxOpenCursors; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java index 23560b1..e25ae5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; import org.jetbrains.annotations.Nullable; import java.io.IOException; @@ -42,14 +43,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { /** Initial output stream capacity. */ private static final int INIT_CAP = 1024; + /** Handler metadata key. */ + private static final int HANDLER_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** Request ID generator. */ private static final AtomicLong REQ_ID_GEN = new AtomicLong(); /** Busy lock. */ private final GridSpinBusyLock busyLock; - /** Request handler. */ - private final OdbcRequestHandler handler; + /** Kernal context. */ + private final GridKernalContext ctx; /** Marshaller. */ private final GridBinaryMarshaller marsh; @@ -60,15 +64,14 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { /** * @param ctx Context. * @param busyLock Shutdown busy lock. - * @param handler Request handler. */ - public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, OdbcRequestHandler handler) { + public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock) { + this.ctx = ctx; this.busyLock = busyLock; - this.handler = handler; CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); - marsh = cacheObjProc.marshaller(); + this.marsh = cacheObjProc.marshaller(); this.log = ctx.log(getClass()); } @@ -77,6 +80,8 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { @Override public void onConnected(GridNioSession ses) { if (log.isDebugEnabled()) log.debug("ODBC client connected: " + ses.remoteAddress()); + + ses.addMeta(HANDLER_META_KEY, new OdbcRequestHandler(ctx, busyLock)); } /** {@inheritDoc} */ @@ -102,10 +107,15 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { if (log.isDebugEnabled()) { startTime = System.nanoTime(); - log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() + ", req=" + req + ']'); + log.debug("ODBC request received [id=" + reqId + ", addr=" + ses.remoteAddress() + + ", req=" + req + ']'); } - OdbcResponse resp = handle(req); + OdbcRequestHandler handler = ses.meta(HANDLER_META_KEY); + + assert handler != null; + + OdbcResponse resp = handler.handle(req); if (log.isDebugEnabled()) { long dur = (System.nanoTime() - startTime) / 1000; @@ -160,16 +170,10 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { switch (cmd) { case OdbcRequest.EXECUTE_SQL_QUERY: { - String cache = reader.readString(); String sql = reader.readString(); int argsNum = reader.readInt(); - if (log.isDebugEnabled()) { - log.debug("Message: [cmd=EXECUTE_SQL_QUERY, cache=" + cache + - ", query=" + sql + ", argsNum=" + argsNum + ']'); - } - Object[] params = new Object[argsNum]; for (int i = 0; i < argsNum; ++i) @@ -181,26 +185,17 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { } case OdbcRequest.FETCH_SQL_QUERY: { - long queryId = reader.readLong(); int pageSize = reader.readInt(); - if (log.isDebugEnabled()) - log.debug("Message: [cmd=FETCH_SQL_QUERY, queryId=" + queryId + ", pageSize=" + pageSize + ']'); - res = new OdbcQueryFetchRequest(queryId, pageSize); break; } case OdbcRequest.CLOSE_SQL_QUERY: { - long queryId = reader.readLong(); - if (log.isDebugEnabled()) { - log.debug("Message: [cmd=CLOSE_SQL_QUERY, queryId=" + queryId + ']'); - } - res = new OdbcQueryCloseRequest(queryId); break; @@ -212,36 +207,24 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { String table = reader.readString(); String column = reader.readString(); - if (log.isDebugEnabled()) { - log.debug("Message: [cmd=GET_COLUMNS_META, cache=" + cache + - ", table=" + table + ", column: " + column + ']'); - } - res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); break; } case OdbcRequest.GET_TABLES_META: { - String catalog = reader.readString(); String schema = reader.readString(); String table = reader.readString(); String tableType = reader.readString(); - if (log.isDebugEnabled()) { - log.debug("Message: [cmd=GET_COLUMNS_META, catalog=" + catalog + - ", schema=" + schema + ", table=" + table + ", tableType=" + tableType + ']'); - } - res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); break; } default: - throw new IOException("Failed to parse incoming packet (unknown command type) " + - "[cmd=[" + Byte.toString(cmd) + ']'); + throw new IOException("Unknown ODBC command: " + cmd); } return res; @@ -352,25 +335,4 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { return writer.array(); } - - /** - * Handle request. - * - * @param req Request. - * @return Response. - */ - private OdbcResponse handle(OdbcRequest req) { - assert req != null; - - if (!busyLock.enterBusy()) - return new OdbcResponse(OdbcResponse.STATUS_FAILED, - "Failed to handle ODBC request because node is stopping: " + req); - - try { - return handler.handle(req); - } - finally { - busyLock.leaveBusy(); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java index 9d22e4a..87be686 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java @@ -73,7 +73,7 @@ public class OdbcProcessor extends GridProcessorAdapter { srv = GridNioServer.<byte[]>builder() .address(host) .port(port) - .listener(new OdbcNioListener(ctx, busyLock, new OdbcRequestHandler(ctx))) + .listener(new OdbcNioListener(ctx, busyLock)) .logger(log) .selectorCount(odbcCfg.getSelectorCount()) .gridName(ctx.gridName()) http://git-wip-us.apache.org/repos/asf/ignite/blob/6a7cb0a9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java index 332a5cc..1af14b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.odbc; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.OdbcConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiTuple; @@ -43,16 +45,21 @@ public class OdbcRequestHandler { /** Kernel context. */ private final GridKernalContext ctx; + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + /** Current queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>(); /** * Constructor. * * @param ctx Context. + * @param busyLock Shutdown latch. */ - public OdbcRequestHandler(final GridKernalContext ctx) { + public OdbcRequestHandler(final GridKernalContext ctx, final GridSpinBusyLock busyLock) { this.ctx = ctx; + this.busyLock = busyLock; } /** @@ -64,24 +71,33 @@ public class OdbcRequestHandler { public OdbcResponse handle(OdbcRequest req) { assert req != null; - switch (req.command()) { - case EXECUTE_SQL_QUERY: - return executeQuery((OdbcQueryExecuteRequest)req); + if (!busyLock.enterBusy()) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, + "Failed to handle ODBC request because node is stopping: " + req); - case FETCH_SQL_QUERY: - return fetchQuery((OdbcQueryFetchRequest)req); + try { + switch (req.command()) { + case EXECUTE_SQL_QUERY: + return executeQuery((OdbcQueryExecuteRequest)req); - case CLOSE_SQL_QUERY: - return closeQuery((OdbcQueryCloseRequest)req); + case FETCH_SQL_QUERY: + return fetchQuery((OdbcQueryFetchRequest)req); - case GET_COLUMNS_META: - return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req); + case CLOSE_SQL_QUERY: + return closeQuery((OdbcQueryCloseRequest)req); - case GET_TABLES_META: - return getTablesMeta((OdbcQueryGetTablesMetaRequest) req); - } + case GET_COLUMNS_META: + return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req); - return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req); + case GET_TABLES_META: + return getTablesMeta((OdbcQueryGetTablesMetaRequest) req); + } + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req); + } + finally { + busyLock.leaveBusy(); + } } /** @@ -91,6 +107,17 @@ public class OdbcRequestHandler { * @return Response. */ private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) { + OdbcConfiguration cfg = ctx.config().getOdbcConfiguration(); + + assert cfg != null; + + int cursorCnt = qryCursors.size(); + + if (cursorCnt >= cfg.getMaxOpenCursors()) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Too many opened cursors (either close other " + + "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " + + "[maximum=" + cfg.getMaxOpenCursors() + ", current=" + cursorCnt + ']'); + long qryId = QRY_ID_GEN.getAndIncrement(); try { @@ -108,7 +135,7 @@ public class OdbcRequestHandler { Iterator iter = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, iter)); + qryCursors.put(qryId, new IgniteBiTuple<>(qryCur, iter)); List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); @@ -117,7 +144,7 @@ public class OdbcRequestHandler { return new OdbcResponse(res); } catch (Exception e) { - qryCurs.remove(qryId); + qryCursors.remove(qryId); return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); } @@ -131,21 +158,21 @@ public class OdbcRequestHandler { */ private OdbcResponse closeQuery(OdbcQueryCloseRequest req) { try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + QueryCursor cur = qryCursors.get(req.queryId()).get1(); if (cur == null) return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); cur.close(); - qryCurs.remove(req.queryId()); + qryCursors.remove(req.queryId()); OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId()); return new OdbcResponse(res); } catch (Exception e) { - qryCurs.remove(req.queryId()); + qryCursors.remove(req.queryId()); return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); } @@ -159,7 +186,7 @@ public class OdbcRequestHandler { */ private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + Iterator cur = qryCursors.get(req.queryId()).get2(); if (cur == null) return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId());