Repository: ignite Updated Branches: refs/heads/master 1df5a268d -> 01f60542e
http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index b9c9cdd..f8b1c40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -28,19 +28,23 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import javax.cache.configuration.Factory; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.authentication.AuthorizationContext; import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; -import org.apache.ignite.internal.processors.authentication.AuthorizationContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; @@ -60,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE; @@ -69,6 +74,7 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionCont import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_5_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES; @@ -109,6 +115,15 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Current bulk load processors. */ private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>(); + /** Ordered batches queue. */ + private final PriorityQueue<JdbcOrderedBatchExecuteRequest> orderedBatchesQueue = new PriorityQueue<>(); + + /** Ordered batches mutex. */ + private final Object orderedBatchesMux = new Object(); + + /** Response sender. */ + private final JdbcResponseSender sender; + /** Automatic close of cursors. */ private final boolean autoCloseCursors; @@ -122,6 +137,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * Constructor. * @param ctx Context. * @param busyLock Shutdown latch. + * @param sender Results sender. * @param maxCursors Maximum allowed cursors. * @param distributedJoins Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. @@ -133,14 +149,23 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @param actx Authentication context. * @param protocolVer Protocol version. */ - public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, + public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, + JdbcResponseSender sender, int maxCursors, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate, AuthorizationContext actx, ClientListenerProtocolVersion protocolVer) { this.ctx = ctx; + this.sender = sender; + + Factory<GridWorker> orderedFactory = new Factory<GridWorker>() { + @Override public GridWorker create() { + return new OrderedBatchWorker(); + } + }; this.cliCtx = new SqlClientContext( ctx, + orderedFactory, distributedJoins, enforceJoinOrder, collocated, @@ -190,6 +215,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { case BATCH_EXEC: return executeBatch((JdbcBatchExecuteRequest)req); + case BATCH_EXEC_ORDERED: + return dispatchBatchOrdered((JdbcOrderedBatchExecuteRequest)req); + case META_TABLES: return getTablesMeta((JdbcMetaTablesRequest)req); @@ -223,6 +251,55 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } /** + * @param req Ordered batch request. + * @return Response. + */ + private ClientListenerResponse dispatchBatchOrdered(JdbcOrderedBatchExecuteRequest req) { + synchronized (orderedBatchesMux) { + orderedBatchesQueue.add(req); + + orderedBatchesMux.notify(); + } + + if (!cliCtx.isStreamOrdered()) + executeBatchOrdered(req); + + return null; + } + + /** + * @param req Ordered batch request. + * @return Response. + */ + private ClientListenerResponse executeBatchOrdered(JdbcOrderedBatchExecuteRequest req) { + try { + if (req.isLastStreamBatch()) + cliCtx.waitTotalProcessedOrderedRequests(req.order()); + + JdbcResponse resp = (JdbcResponse)executeBatch(req); + + if (resp.response() instanceof JdbcBatchExecuteResult) { + resp = new JdbcResponse( + new JdbcOrderedBatchExecuteResult((JdbcBatchExecuteResult)resp.response(), req.order())); + } + + sender.send(resp); + } catch (Exception e) { + U.error(null, "Error processing file batch", e); + + sender.send(new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e)); + } + + synchronized (orderedBatchesMux) { + orderedBatchesQueue.poll(); + } + + cliCtx.orderedRequestProcessed(); + + return null; + } + + /** * Processes a file batch sent from client as part of bulk load COPY command. * * @param req Request object with a batch of a file received from client. @@ -948,4 +1025,42 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { else return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, e.toString()); } + + /** + * Ordered batch worker. + */ + private class OrderedBatchWorker extends GridWorker { + /** + * Constructor. + */ + OrderedBatchWorker() { + super(ctx.igniteInstanceName(), "ordered-batch", JdbcRequestHandler.this.log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + long nextBatchOrder = 0; + + while (true) { + if (!cliCtx.isStream()) + return; + + JdbcOrderedBatchExecuteRequest req; + + synchronized (orderedBatchesMux) { + req = orderedBatchesQueue.peek(); + + if (req == null || req.order() != nextBatchOrder) { + orderedBatchesMux.wait(); + + continue; + } + } + + executeBatchOrdered(req); + + nextBatchOrder++; + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java new file mode 100644 index 0000000..128bcee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; + +/** + * JDBC response result sender. + */ +public interface JdbcResponseSender { + /** + * Send response to the client. Used for asynchronous result send. + * @param resp JDBC response. + */ + public void send(ClientListenerResponse resp); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java index 4fea207..199e5da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java @@ -71,6 +71,9 @@ public class JdbcResult implements JdbcRawBinarylizable { /** Columns metadata result V4. */ static final byte META_COLUMNS_V4 = 17; + /** A result of the processing ordered batch request. */ + static final byte BATCH_EXEC_ORDERED = 18; + /** Success status. */ private byte type; @@ -179,6 +182,11 @@ public class JdbcResult implements JdbcRawBinarylizable { break; + case BATCH_EXEC_ORDERED: + res = new JdbcOrderedBatchExecuteResult(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java index e8c2932..42dbae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java @@ -20,10 +20,14 @@ package org.apache.ignite.internal.processors.query; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import javax.cache.configuration.Factory; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; /** * Container for connection properties passed by various drivers (JDBC drivers, possibly ODBC) having notion of an @@ -53,6 +57,9 @@ public class SqlClientContext implements AutoCloseable { /** Skip reducer on update flag. */ private final boolean skipReducerOnUpdate; + /** Monitor. */ + private final Object mux = new Object(); + /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */ private boolean streamAllowOverwrite; @@ -65,14 +72,28 @@ public class SqlClientContext implements AutoCloseable { /** Auto flush frequency for streaming. */ private long streamFlushTimeout; + /** Stream ordered. */ + private boolean streamOrdered; + /** Streamers for various caches. */ - private Map<String, IgniteDataStreamer<?, ?>> streamers; + private volatile Map<String, IgniteDataStreamer<?, ?>> streamers; + + /** Ordered batch thread. */ + private IgniteThread orderedBatchThread; + + /** Ordered batch worker factory. */ + private Factory<GridWorker> orderedBatchWorkerFactory; + + /** Count of the processed ordered batch requests. Used to wait end of processing all request before starts + * the processing the last request. */ + private long totalProcessedOrderedReqs; /** Logger. */ private final IgniteLogger log; /** * @param ctx Kernal context. + * @param orderedBatchWorkerFactory Ordered batch worker factory. * @param distributedJoins Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. * @param collocated Collocated flag. @@ -80,9 +101,11 @@ public class SqlClientContext implements AutoCloseable { * @param lazy Lazy query execution flag. * @param skipReducerOnUpdate Skip reducer on update flag. */ - public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder, + public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> orderedBatchWorkerFactory, + boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate) { this.ctx = ctx; + this.orderedBatchWorkerFactory = orderedBatchWorkerFactory; this.distributedJoins = distributedJoins; this.enforceJoinOrder = enforceJoinOrder; this.collocated = collocated; @@ -100,37 +123,52 @@ public class SqlClientContext implements AutoCloseable { * @param flushFreq Flush frequency for streamers. * @param perNodeBufSize Per node streaming buffer size. * @param perNodeParOps Per node streaming parallel operations number. + * @param ordered Ordered stream flag. */ - public void enableStreaming(boolean allowOverwrite, long flushFreq, int perNodeBufSize, int perNodeParOps) { - if (isStream()) - return; + public void enableStreaming(boolean allowOverwrite, long flushFreq, int perNodeBufSize, + int perNodeParOps, boolean ordered) { + synchronized (mux) { + if (isStream()) + return; + + streamers = new HashMap<>(); - streamers = new HashMap<>(); + this.streamAllowOverwrite = allowOverwrite; + this.streamFlushTimeout = flushFreq; + this.streamNodeBufSize = perNodeBufSize; + this.streamNodeParOps = perNodeParOps; + this.streamOrdered = ordered; - this.streamAllowOverwrite = allowOverwrite; - this.streamFlushTimeout = flushFreq; - this.streamNodeBufSize = perNodeBufSize; - this.streamNodeParOps = perNodeParOps; + if (ordered) { + orderedBatchThread = new IgniteThread(orderedBatchWorkerFactory.create()); + + orderedBatchThread.start(); + } + } } /** * Turn off streaming on this client context - with closing all open streamers, if any. */ public void disableStreaming() { - if (!isStream()) - return; + synchronized (mux) { + if (!isStream()) + return; - Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator(); + Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator(); - while (it.hasNext()) { - IgniteDataStreamer<?, ?> streamer = it.next(); + while (it.hasNext()) { + IgniteDataStreamer<?, ?> streamer = it.next(); - U.close(streamer, log); + U.close(streamer, log); - it.remove(); - } + it.remove(); + } + + streamers = null; - streamers = null; + orderedBatchThread = null; + } } /** @@ -179,7 +217,18 @@ public class SqlClientContext implements AutoCloseable { * @return Streaming state flag (on or off). */ public boolean isStream() { - return streamers != null; + synchronized (mux) { + return streamers != null; + } + } + + /** + * @return Stream ordered flag. + */ + public boolean isStreamOrdered() { + synchronized (mux) { + return streamOrdered; + } } /** @@ -187,29 +236,59 @@ public class SqlClientContext implements AutoCloseable { * @return Streamer for given cache. */ public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) { - if (streamers == null) - return null; + synchronized (mux) { + if (streamers == null) + return null; - IgniteDataStreamer<?, ?> res = streamers.get(cacheName); + IgniteDataStreamer<?, ?> res = streamers.get(cacheName); - if (res != null) - return res; + if (res != null) + return res; - res = ctx.grid().dataStreamer(cacheName); + res = ctx.grid().dataStreamer(cacheName); - res.autoFlushFrequency(streamFlushTimeout); + res.autoFlushFrequency(streamFlushTimeout); - res.allowOverwrite(streamAllowOverwrite); + res.allowOverwrite(streamAllowOverwrite); - if (streamNodeBufSize > 0) - res.perNodeBufferSize(streamNodeBufSize); + if (streamNodeBufSize > 0) + res.perNodeBufferSize(streamNodeBufSize); - if (streamNodeParOps > 0) - res.perNodeParallelOperations(streamNodeParOps); + if (streamNodeParOps > 0) + res.perNodeParallelOperations(streamNodeParOps); - streamers.put(cacheName, res); + streamers.put(cacheName, res); - return res; + return res; + } + } + + /** + * Waits when total processed ordered requests count to be equal to specified value. + * @param total Expected total processed request. + */ + public void waitTotalProcessedOrderedRequests(long total) { + synchronized (mux) { + while (totalProcessedOrderedReqs < total) { + try { + mux.wait(); + } + catch (InterruptedException e) { + throw new IgniteException("Waiting for end of processing the last batch is interrupted", e); + } + } + } + } + + /** + * + */ + public void orderedRequestProcessed() { + synchronized (mux) { + totalProcessedOrderedReqs++; + + mux.notify(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java index 0fd08f4..be76482 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java @@ -185,6 +185,9 @@ public class SqlKeyword { /** Keyword: ON. */ public static final String ON = "ON"; + /** Keyword: ORDERED. */ + public static final String ORDERED = "ORDERED"; + /** Keyword: PER_NODE_PARALLEL_OPERATIONS. */ public static final String PER_NODE_PARALLEL_OPERATIONS = "PER_NODE_PARALLEL_OPERATIONS"; http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java index c492c61..42f17d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java @@ -53,6 +53,9 @@ public class SqlSetStreamingCommand implements SqlCommand { /** Streamer flush timeout. */ private long flushFreq; + /** Ordered streamer. */ + private boolean ordered; + /** {@inheritDoc} */ @Override public SqlCommand parse(SqlLexer lex) { turnOn = parseBoolean(lex); @@ -116,6 +119,15 @@ public class SqlSetStreamingCommand implements SqlCommand { break; + case SqlKeyword.ORDERED: + lex.shift(); + + checkOffLast(lex); + + ordered = true; + + break; + default: return this; } @@ -179,6 +191,13 @@ public class SqlSetStreamingCommand implements SqlCommand { return flushFreq; } + /** + * @return {@code true} if the streamer keep the order of the statements. Otherwise returns {@code false}. + */ + public boolean isOrdered() { + return ordered; + } + /** {@inheritDoc} */ @Override public String schemaName() { return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java index 65bb599..7e699f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java @@ -28,18 +28,25 @@ public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { * */ public void testParseSetStreaming() { - parseValidate("set streaming on", true, false, 2048, 0, 0, 0); - parseValidate("set streaming 1", true, false, 2048, 0, 0, 0); - parseValidate("set streaming off", false, false, 2048, 0, 0, 0); - parseValidate("set streaming 0", false, false, 2048, 0, 0, 0); - parseValidate("set streaming on batch_size 100", true, false, 100, 0, 0, 0); - parseValidate("set streaming on flush_frequency 500", true, false, 2048, 0, 0, 500); - parseValidate("set streaming on per_node_buffer_size 100", true, false, 2048, 0, 100, 0); - parseValidate("set streaming on per_node_parallel_operations 4", true, false, 2048, 4, 0, 0); - parseValidate("set streaming on allow_overwrite on", true, true, 2048, 0, 0, 0); - parseValidate("set streaming on allow_overwrite off", true, false, 2048, 0, 0, 0); + parseValidate("set streaming on", true, false, 2048, 0, 0, 0, false); + parseValidate("set streaming 1", true, false, 2048, 0, 0, 0, false); + parseValidate("set streaming off", false, false, 2048, 0, 0, 0, false); + parseValidate("set streaming 0", false, false, 2048, 0, 0, 0, false); + parseValidate("set streaming on batch_size 100", true, false, 100, 0, 0, 0, false); + parseValidate("set streaming on flush_frequency 500", true, false, 2048, 0, 0, 500, false); + parseValidate("set streaming on per_node_buffer_size 100", true, false, 2048, 0, 100, 0, false); + parseValidate("set streaming on per_node_parallel_operations 4", true, false, 2048, 4, 0, 0, false); + parseValidate("set streaming on allow_overwrite on", true, true, 2048, 0, 0, 0, false); + parseValidate("set streaming on allow_overwrite off", true, false, 2048, 0, 0, 0, false); parseValidate("set streaming on per_node_buffer_size 50 flush_frequency 500 " + - "per_node_parallel_operations 4 allow_overwrite on batch_size 100", true, true, 100, 4, 50, 500); + "per_node_parallel_operations 4 allow_overwrite on batch_size 100", true, true, 100, 4, 50, 500, false); + + parseValidate("set streaming on ordered", true, false, 2048, 0, 0, 0, true); + parseValidate("set streaming 1 ordered", true, false, 2048, 0, 0, 0, true); + parseValidate("set streaming on batch_size 100 ordered", true, false, 100, 0, 0, 0, true); + parseValidate("set streaming on per_node_buffer_size 50 flush_frequency 500 " + + "per_node_parallel_operations 4 allow_overwrite on batch_size 100 ordered", true, true, 100, 4, 50, 500, true); + assertParseError(QueryUtils.DFLT_SCHEMA, "set", "Failed to parse SQL statement \"set[*]\": Unexpected end of command (expected: \"STREAMING\")"); @@ -84,6 +91,10 @@ public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { "Failed to parse SQL statement \"set streaming off [*]allow_overwrite\": Unexpected token: " + "\"ALLOW_OVERWRITE\""); + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off ordered", + "Failed to parse SQL statement \"set streaming off [*]ordered\": Unexpected token: " + + "\"ORDERED\""); + assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off batch_size", "Failed to parse SQL statement \"set streaming off [*]batch_size\": Unexpected token: " + "\"BATCH_SIZE\""); @@ -102,6 +113,7 @@ public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off table", "Failed to parse SQL statement \"set streaming off [*]table\": Unexpected token: \"TABLE\""); + } /** @@ -114,9 +126,10 @@ public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { * @param expParOps Expected per-node parallael operations. * @param expBufSize Expected per node buffer size. * @param expFlushFreq Expected flush frequency. + * @param ordered Ordered stream flag. */ private static void parseValidate(String sql, boolean expOn, boolean expAllowOverwrite, int expBatchSize, - int expParOps, int expBufSize, long expFlushFreq) { + int expParOps, int expBufSize, long expFlushFreq, boolean ordered) { SqlSetStreamingCommand cmd = (SqlSetStreamingCommand)new SqlParser(QueryUtils.DFLT_SCHEMA, sql).nextCommand(); assertEquals(expOn, cmd.isTurnOn()); @@ -130,5 +143,7 @@ public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest { assertEquals(expBufSize, cmd.perNodeBufferSize()); assertEquals(expFlushFreq, cmd.flushFrequency()); + + assertEquals(ordered, cmd.isOrdered()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 7b1a093..2d019d3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1561,7 +1561,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (on) cliCtx.enableStreaming(setCmd.allowOverwrite(), setCmd.flushFrequency(), - setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations()); + setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations(), setCmd.isOrdered()); else cliCtx.disableStreaming(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/yardstick/config/ignite-localhost-config.xml ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/ignite-localhost-config.xml b/modules/yardstick/config/ignite-localhost-config.xml index 3e57f4f..9b86850 100644 --- a/modules/yardstick/config/ignite-localhost-config.xml +++ b/modules/yardstick/config/ignite-localhost-config.xml @@ -27,8 +27,6 @@ <import resource="ignite-base-config.xml"/> <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" parent="base-ignite.cfg"> - <property name="localHost" value="127.0.0.1"/> - <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties ---------------------------------------------------------------------- diff --git a/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties b/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties new file mode 100644 index 0000000..e0def4b --- /dev/null +++ b/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties @@ -0,0 +1,132 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Benchmarks for data upload in inmemory mode (persistence disabled). +# + +now0=`date +'%H%M%S'` + +# JVM options. +JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false" + +# Uncomment to enable concurrent garbage collection (GC) if you encounter long GC pauses. +JVM_OPTS=${JVM_OPTS}" \ +-Xms8g \ +-Xmx8g \ +-Xloggc:./gc${now0}.log \ +-XX:+PrintGCDetails \ +-verbose:gc \ +-XX:+UseParNewGC \ +-XX:+UseConcMarkSweepGC \ +-XX:+PrintGCDateStamps \ +" + +#Ignite version +ver="RELEASE-" + +# List of default probes. +# Add DStatProbe or VmStatProbe if your OS supports it (e.g. if running on Linux). +BENCHMARK_DEFAULT_PROBES=TotalTimeProbe + +# Packages where the specified benchmark is searched by reflection mechanism. +BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick + +# Flag which indicates to restart the servers before every benchmark execution. +RESTART_SERVERS=true + +# Probe point writer class name. +# BENCHMARK_WRITER= + +# The benchmark is applicable only for 2 servers (the second server is started in client mode) and 1 driver. +SERVER_HOSTS=localhost,localhost +DRIVER_HOSTS=localhost + +# Remote username. +# REMOTE_USER= + +# Number of nodes, used to wait for the specified number of nodes to start. +nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo ${DRIVER_HOSTS} | tr ',' '\n' | wc -l`)) + +# Backups count. +b=1 + +# Warmup. +w=0 + +# Threads count. +t=1 + +# Sync mode. +sm=FULL_SYNC + + +# Run configuration which contains all benchmarks. +# Note that each benchmark is set to run only one time, warmup parameter is set to 0 due to custom warmup operation. +CONFIGS="\ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-256-order-on \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-local-batch-size 256 \ +, \ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-1024-order-on \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-local-batch-size 1024 \ +, \ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-4096-order-on \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-local-batch-size 4096 \ +, \ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-256-order-off \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-ordered false \ + --streamer-local-batch-size 256 \ +, \ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-1024-order-off \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-ordered false \ + --streamer-local-batch-size 1024 \ +, \ +-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b ${b} --warmup ${w} --operations 1 \ + -jdbc jdbc:ignite:thin://auto.find/ \ + --threads ${t} \ + --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds ${ver}sql-upload-inmemory-streaming-batch-4096-order-off \ + --upload-rows 1000000 -cl --clientNodesAfterId 0 \ + --use-streaming true \ + --streamer-ordered false \ + --streamer-local-batch-size 4096 \ +, \ +" http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java index 995201d..f6e8014 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java @@ -43,4 +43,9 @@ public interface StreamerParams { * @return Allow overwrite flag. */ @Nullable public Boolean streamerAllowOverwrite(); -} + + /** + * @return Ordered flag. + */ + public boolean streamerOrdered(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java index 635ba6b..7a1b116 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java @@ -74,6 +74,10 @@ public class UploadBenchmarkArguments implements StreamerParams { description = "Streamer benchmarks only: set allowOverwrite streamer parameter.") private Boolean streamerAllowOverwrite = null; + @Parameter(names = {"--streamer-ordered"}, arity = 1, + description = "Streamer benchmarks only: set streamer ordered flag.") + private boolean streamerOrdered = true; + /** How many rows to upload during warmup. */ @Parameter(names = {"--upload-warmup-rows"}) private long warmupRowsCnt = 3_000_000; @@ -111,14 +115,14 @@ public class UploadBenchmarkArguments implements StreamerParams { /** * @return Value for {@link IgniteDataStreamer#perNodeBufferSize(int)}. */ - @Nullable public Integer streamerPerNodeBufferSize() { + @Override @Nullable public Integer streamerPerNodeBufferSize() { return streamerNodeBufSize; } /** * @return Value for {@link IgniteDataStreamer#perNodeParallelOperations(int)}. */ - @Nullable public Integer streamerPerNodeParallelOperations() { + @Override @Nullable public Integer streamerPerNodeParallelOperations() { return streamerNodeParOps; } @@ -128,18 +132,25 @@ public class UploadBenchmarkArguments implements StreamerParams { * or set STREAMING sql command parameter. <br/> * If set to 1, {@link IgniteDataStreamer#addData(Object, Object)} method will be used. */ - @Nullable public Integer streamerLocalBatchSize() { + @Override @Nullable public Integer streamerLocalBatchSize() { return streamerLocBatchSize; } /** * Bypass corresponding parameter to streamer. */ - @Nullable public Boolean streamerAllowOverwrite() { + @Override @Nullable public Boolean streamerAllowOverwrite() { return streamerAllowOverwrite; } /** + * Bypass corresponding parameter to streamer. + */ + @Override public boolean streamerOrdered() { + return streamerOrdered; + } + + /** * See {@link #warmupRowsCnt}. */ public long warmupRowsCnt() { http://git-wip-us.apache.org/repos/asf/ignite/blob/01f60542/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java index 87efa83..3ff4cb4 100644 --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java @@ -208,6 +208,8 @@ public class QueryFactory { if (p.streamerPerNodeBufferSize() != null) cmd.append(" PER_NODE_BUFFER_SIZE ").append(p.streamerPerNodeBufferSize()); + cmd.append(" ORDERED ").append(p.streamerOrdered() ? "ON" : "OFF"); + return cmd.append(';').toString(); } }