Repository: ignite
Updated Branches:
  refs/heads/ignite-2.5 46dac58cd -> f68ee153d


http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index cf0e98b..7ed3e1f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -28,19 +28,23 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import 
org.apache.ignite.internal.processors.authentication.AuthorizationContext;
 import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
-import 
org.apache.ignite.internal.processors.authentication.AuthorizationContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
@@ -60,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
@@ -68,6 +73,7 @@ import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchR
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC_ORDERED;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES;
@@ -108,6 +114,15 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Current bulk load processors. */
     private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> 
bulkLoadRequests = new ConcurrentHashMap<>();
 
+    /** Ordered batches queue. */
+    private final PriorityQueue<JdbcOrderedBatchExecuteRequest> 
orderedBatchesQueue = new PriorityQueue<>();
+
+    /** Ordered batches mutex. */
+    private final Object orderedBatchesMux = new Object();
+
+    /** Response sender. */
+    private final JdbcResponseSender sender;
+
     /** Automatic close of cursors. */
     private final boolean autoCloseCursors;
 
@@ -121,6 +136,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * Constructor.
      * @param ctx Context.
      * @param busyLock Shutdown latch.
+     * @param sender Results sender.
      * @param maxCursors Maximum allowed cursors.
      * @param distributedJoins Distributed joins flag.
      * @param enforceJoinOrder Enforce join order flag.
@@ -132,14 +148,23 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @param actx Authentication context.
      * @param protocolVer Protocol version.
      */
-    public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock 
busyLock, int maxCursors,
+    public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock,
+        JdbcResponseSender sender, int maxCursors,
         boolean distributedJoins, boolean enforceJoinOrder, boolean 
collocated, boolean replicatedOnly,
         boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate,
         AuthorizationContext actx, ClientListenerProtocolVersion protocolVer) {
         this.ctx = ctx;
+        this.sender = sender;
+
+        Factory<GridWorker> orderedFactory = new Factory<GridWorker>() {
+            @Override public GridWorker create() {
+                return new OrderedBatchWorker();
+            }
+        };
 
         this.cliCtx = new SqlClientContext(
             ctx,
+            orderedFactory,
             distributedJoins,
             enforceJoinOrder,
             collocated,
@@ -189,6 +214,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                 case BATCH_EXEC:
                     return executeBatch((JdbcBatchExecuteRequest)req);
 
+                case BATCH_EXEC_ORDERED:
+                    return 
dispatchBatchOrdered((JdbcOrderedBatchExecuteRequest)req);
+
                 case META_TABLES:
                     return getTablesMeta((JdbcMetaTablesRequest)req);
 
@@ -222,6 +250,55 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     }
 
     /**
+     * @param req Ordered batch request.
+     * @return Response.
+     */
+    private ClientListenerResponse 
dispatchBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
+        synchronized (orderedBatchesMux) {
+            orderedBatchesQueue.add(req);
+
+            orderedBatchesMux.notify();
+        }
+
+        if (!cliCtx.isStreamOrdered())
+            executeBatchOrdered(req);
+
+        return null;
+    }
+
+    /**
+     * @param req Ordered batch request.
+     * @return Response.
+     */
+    private ClientListenerResponse 
executeBatchOrdered(JdbcOrderedBatchExecuteRequest req) {
+        try {
+            if (req.isLastStreamBatch())
+                cliCtx.waitTotalProcessedOrderedRequests(req.order());
+
+            JdbcResponse resp = (JdbcResponse)executeBatch(req);
+
+            if (resp.response() instanceof JdbcBatchExecuteResult) {
+                resp = new JdbcResponse(
+                    new 
JdbcOrderedBatchExecuteResult((JdbcBatchExecuteResult)resp.response(), 
req.order()));
+            }
+
+            sender.send(resp);
+        } catch (Exception e) {
+            U.error(null, "Error processing file batch", e);
+
+            sender.send(new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server 
error: " + e));
+        }
+
+        synchronized (orderedBatchesMux) {
+            orderedBatchesQueue.poll();
+        }
+
+        cliCtx.orderedRequestProcessed();
+
+        return null;
+    }
+
+    /**
      * Processes a file batch sent from client as part of bulk load COPY 
command.
      *
      * @param req Request object with a batch of a file received from client.
@@ -938,4 +1015,42 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         else
             return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, 
e.toString());
     }
+
+    /**
+     * Ordered batch worker.
+     */
+    private class OrderedBatchWorker extends GridWorker {
+        /**
+         * Constructor.
+         */
+        OrderedBatchWorker() {
+            super(ctx.igniteInstanceName(), "ordered-batch", 
JdbcRequestHandler.this.log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            long nextBatchOrder = 0;
+
+            while (true) {
+                if (!cliCtx.isStream())
+                    return;
+
+                JdbcOrderedBatchExecuteRequest req;
+
+                synchronized (orderedBatchesMux) {
+                    req = orderedBatchesQueue.peek();
+
+                    if (req == null || req.order() != nextBatchOrder) {
+                        orderedBatchesMux.wait();
+
+                        continue;
+                    }
+                }
+
+                executeBatchOrdered(req);
+
+                nextBatchOrder++;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java
new file mode 100644
index 0000000..128bcee
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponseSender.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+
+/**
+ * JDBC response result sender.
+ */
+public interface JdbcResponseSender {
+    /**
+     * Send response to the client. Used for asynchronous result send.
+     * @param resp JDBC response.
+     */
+    public void send(ClientListenerResponse resp);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
index 43631e9..556917c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -68,6 +68,9 @@ public class JdbcResult implements JdbcRawBinarylizable {
     /** A request to send file from client to server. */
     static final byte BULK_LOAD_ACK = 16;
 
+    /** A result of the processing ordered batch request. */
+    static final byte BATCH_EXEC_ORDERED = 18;
+
     /** Success status. */
     private byte type;
 
@@ -171,6 +174,16 @@ public class JdbcResult implements JdbcRawBinarylizable {
 
                 break;
 
+            case META_COLUMNS_V4:
+                res = new JdbcMetaColumnsResultV4();
+
+                break;
+
+            case BATCH_EXEC_ORDERED:
+                res = new JdbcOrderedBatchExecuteResult();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: 
[request ID=" + resId + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
index e8c2932..42dbae6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
@@ -20,10 +20,14 @@ package org.apache.ignite.internal.processors.query;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import javax.cache.configuration.Factory;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
 
 /**
  * Container for connection properties passed by various drivers (JDBC 
drivers, possibly ODBC) having notion of an
@@ -53,6 +57,9 @@ public class SqlClientContext implements AutoCloseable {
     /** Skip reducer on update flag. */
     private final boolean skipReducerOnUpdate;
 
+    /** Monitor. */
+    private final Object mux = new Object();
+
     /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
     private boolean streamAllowOverwrite;
 
@@ -65,14 +72,28 @@ public class SqlClientContext implements AutoCloseable {
     /** Auto flush frequency for streaming. */
     private long streamFlushTimeout;
 
+    /** Stream ordered. */
+    private boolean streamOrdered;
+
     /** Streamers for various caches. */
-    private Map<String, IgniteDataStreamer<?, ?>> streamers;
+    private volatile Map<String, IgniteDataStreamer<?, ?>> streamers;
+
+    /** Ordered batch thread. */
+    private IgniteThread orderedBatchThread;
+
+    /** Ordered batch worker factory. */
+    private Factory<GridWorker> orderedBatchWorkerFactory;
+
+    /** Count of the processed ordered batch requests. Used to wait end of 
processing all request before starts
+     * the processing the last request. */
+    private long totalProcessedOrderedReqs;
 
     /** Logger. */
     private final IgniteLogger log;
 
     /**
      * @param ctx Kernal context.
+     * @param orderedBatchWorkerFactory Ordered batch worker factory.
      * @param distributedJoins Distributed joins flag.
      * @param enforceJoinOrder Enforce join order flag.
      * @param collocated Collocated flag.
@@ -80,9 +101,11 @@ public class SqlClientContext implements AutoCloseable {
      * @param lazy Lazy query execution flag.
      * @param skipReducerOnUpdate Skip reducer on update flag.
      */
-    public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, 
boolean enforceJoinOrder,
+    public SqlClientContext(GridKernalContext ctx, Factory<GridWorker> 
orderedBatchWorkerFactory,
+        boolean distributedJoins, boolean enforceJoinOrder,
         boolean collocated, boolean replicatedOnly, boolean lazy, boolean 
skipReducerOnUpdate) {
         this.ctx = ctx;
+        this.orderedBatchWorkerFactory = orderedBatchWorkerFactory;
         this.distributedJoins = distributedJoins;
         this.enforceJoinOrder = enforceJoinOrder;
         this.collocated = collocated;
@@ -100,37 +123,52 @@ public class SqlClientContext implements AutoCloseable {
      * @param flushFreq Flush frequency for streamers.
      * @param perNodeBufSize Per node streaming buffer size.
      * @param perNodeParOps Per node streaming parallel operations number.
+     * @param ordered Ordered stream flag.
      */
-    public void enableStreaming(boolean allowOverwrite, long flushFreq, int 
perNodeBufSize, int perNodeParOps) {
-        if (isStream())
-            return;
+    public void enableStreaming(boolean allowOverwrite, long flushFreq, int 
perNodeBufSize,
+        int perNodeParOps, boolean ordered) {
+        synchronized (mux) {
+            if (isStream())
+                return;
+
+            streamers = new HashMap<>();
 
-        streamers = new HashMap<>();
+            this.streamAllowOverwrite = allowOverwrite;
+            this.streamFlushTimeout = flushFreq;
+            this.streamNodeBufSize = perNodeBufSize;
+            this.streamNodeParOps = perNodeParOps;
+            this.streamOrdered = ordered;
 
-        this.streamAllowOverwrite = allowOverwrite;
-        this.streamFlushTimeout = flushFreq;
-        this.streamNodeBufSize = perNodeBufSize;
-        this.streamNodeParOps = perNodeParOps;
+            if (ordered) {
+                orderedBatchThread = new 
IgniteThread(orderedBatchWorkerFactory.create());
+
+                orderedBatchThread.start();
+            }
+        }
     }
 
     /**
      * Turn off streaming on this client context - with closing all open 
streamers, if any.
      */
     public void disableStreaming() {
-        if (!isStream())
-            return;
+        synchronized (mux) {
+            if (!isStream())
+                return;
 
-        Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator();
+            Iterator<IgniteDataStreamer<?, ?>> it = 
streamers.values().iterator();
 
-        while (it.hasNext()) {
-            IgniteDataStreamer<?, ?> streamer = it.next();
+            while (it.hasNext()) {
+                IgniteDataStreamer<?, ?> streamer = it.next();
 
-            U.close(streamer, log);
+                U.close(streamer, log);
 
-            it.remove();
-        }
+                it.remove();
+            }
+
+            streamers = null;
 
-        streamers = null;
+            orderedBatchThread = null;
+        }
     }
 
     /**
@@ -179,7 +217,18 @@ public class SqlClientContext implements AutoCloseable {
      * @return Streaming state flag (on or off).
      */
     public boolean isStream() {
-        return streamers != null;
+        synchronized (mux) {
+            return streamers != null;
+        }
+    }
+
+    /**
+     * @return Stream ordered flag.
+     */
+    public boolean isStreamOrdered() {
+        synchronized (mux) {
+            return streamOrdered;
+        }
     }
 
     /**
@@ -187,29 +236,59 @@ public class SqlClientContext implements AutoCloseable {
      * @return Streamer for given cache.
      */
     public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) {
-        if (streamers == null)
-            return null;
+        synchronized (mux) {
+            if (streamers == null)
+                return null;
 
-        IgniteDataStreamer<?, ?> res = streamers.get(cacheName);
+            IgniteDataStreamer<?, ?> res = streamers.get(cacheName);
 
-        if (res != null)
-            return res;
+            if (res != null)
+                return res;
 
-        res = ctx.grid().dataStreamer(cacheName);
+            res = ctx.grid().dataStreamer(cacheName);
 
-        res.autoFlushFrequency(streamFlushTimeout);
+            res.autoFlushFrequency(streamFlushTimeout);
 
-        res.allowOverwrite(streamAllowOverwrite);
+            res.allowOverwrite(streamAllowOverwrite);
 
-        if (streamNodeBufSize > 0)
-            res.perNodeBufferSize(streamNodeBufSize);
+            if (streamNodeBufSize > 0)
+                res.perNodeBufferSize(streamNodeBufSize);
 
-        if (streamNodeParOps > 0)
-            res.perNodeParallelOperations(streamNodeParOps);
+            if (streamNodeParOps > 0)
+                res.perNodeParallelOperations(streamNodeParOps);
 
-        streamers.put(cacheName, res);
+            streamers.put(cacheName, res);
 
-        return res;
+            return res;
+        }
+    }
+
+    /**
+     * Waits when total processed ordered requests count  to be equal to 
specified value.
+     * @param total Expected total processed request.
+     */
+    public void waitTotalProcessedOrderedRequests(long total) {
+        synchronized (mux) {
+            while (totalProcessedOrderedReqs < total) {
+                try {
+                    mux.wait();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException("Waiting for end of processing 
the last batch is interrupted", e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public void orderedRequestProcessed() {
+        synchronized (mux) {
+            totalProcessedOrderedReqs++;
+
+            mux.notify();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
index 0fd08f4..be76482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
@@ -185,6 +185,9 @@ public class SqlKeyword {
     /** Keyword: ON. */
     public static final String ON = "ON";
 
+    /** Keyword: ORDERED. */
+    public static final String ORDERED = "ORDERED";
+
     /** Keyword: PER_NODE_PARALLEL_OPERATIONS. */
     public static final String PER_NODE_PARALLEL_OPERATIONS = 
"PER_NODE_PARALLEL_OPERATIONS";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
index c492c61..42f17d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
@@ -53,6 +53,9 @@ public class SqlSetStreamingCommand implements SqlCommand {
     /** Streamer flush timeout. */
     private long flushFreq;
 
+    /** Ordered streamer. */
+    private boolean ordered;
+
     /** {@inheritDoc} */
     @Override public SqlCommand parse(SqlLexer lex) {
         turnOn = parseBoolean(lex);
@@ -116,6 +119,15 @@ public class SqlSetStreamingCommand implements SqlCommand {
 
                     break;
 
+                case SqlKeyword.ORDERED:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    ordered = true;
+
+                    break;
+
                 default:
                     return this;
             }
@@ -179,6 +191,13 @@ public class SqlSetStreamingCommand implements SqlCommand {
         return flushFreq;
     }
 
+    /**
+     * @return {@code true} if the streamer keep the order of the statements. 
Otherwise returns {@code false}.
+     */
+    public boolean isOrdered() {
+        return ordered;
+    }
+
     /** {@inheritDoc} */
     @Override public String schemaName() {
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
index 65bb599..7e699f6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
@@ -28,18 +28,25 @@ public class SqlParserSetStreamingSelfTest extends 
SqlParserAbstractSelfTest {
      *
      */
     public void testParseSetStreaming() {
-        parseValidate("set streaming on", true, false, 2048, 0, 0, 0);
-        parseValidate("set streaming 1", true, false, 2048, 0, 0, 0);
-        parseValidate("set streaming off", false, false, 2048, 0, 0, 0);
-        parseValidate("set streaming 0", false, false, 2048, 0, 0, 0);
-        parseValidate("set streaming on batch_size 100", true, false, 100, 0, 
0, 0);
-        parseValidate("set streaming on flush_frequency 500", true, false, 
2048, 0, 0, 500);
-        parseValidate("set streaming on per_node_buffer_size 100", true, 
false, 2048, 0, 100, 0);
-        parseValidate("set streaming on per_node_parallel_operations 4", true, 
false, 2048, 4, 0, 0);
-        parseValidate("set streaming on allow_overwrite on", true, true, 2048, 
0, 0, 0);
-        parseValidate("set streaming on allow_overwrite off", true, false, 
2048, 0, 0, 0);
+        parseValidate("set streaming on", true, false, 2048, 0, 0, 0, false);
+        parseValidate("set streaming 1", true, false, 2048, 0, 0, 0, false);
+        parseValidate("set streaming off", false, false, 2048, 0, 0, 0, false);
+        parseValidate("set streaming 0", false, false, 2048, 0, 0, 0, false);
+        parseValidate("set streaming on batch_size 100", true, false, 100, 0, 
0, 0, false);
+        parseValidate("set streaming on flush_frequency 500", true, false, 
2048, 0, 0, 500, false);
+        parseValidate("set streaming on per_node_buffer_size 100", true, 
false, 2048, 0, 100, 0, false);
+        parseValidate("set streaming on per_node_parallel_operations 4", true, 
false, 2048, 4, 0, 0, false);
+        parseValidate("set streaming on allow_overwrite on", true, true, 2048, 
0, 0, 0, false);
+        parseValidate("set streaming on allow_overwrite off", true, false, 
2048, 0, 0, 0, false);
         parseValidate("set streaming on per_node_buffer_size 50 
flush_frequency 500 " +
-            "per_node_parallel_operations 4 allow_overwrite on batch_size 
100", true, true, 100, 4, 50, 500);
+            "per_node_parallel_operations 4 allow_overwrite on batch_size 
100", true, true, 100, 4, 50, 500, false);
+
+        parseValidate("set streaming on ordered", true, false, 2048, 0, 0, 0, 
true);
+        parseValidate("set streaming 1 ordered", true, false, 2048, 0, 0, 0, 
true);
+        parseValidate("set streaming on batch_size 100 ordered", true, false, 
100, 0, 0, 0, true);
+        parseValidate("set streaming on per_node_buffer_size 50 
flush_frequency 500 " +
+            "per_node_parallel_operations 4 allow_overwrite on batch_size 100 
ordered", true, true, 100, 4, 50, 500, true);
+
 
         assertParseError(QueryUtils.DFLT_SCHEMA, "set",
             "Failed to parse SQL statement \"set[*]\": Unexpected end of 
command (expected: \"STREAMING\")");
@@ -84,6 +91,10 @@ public class SqlParserSetStreamingSelfTest extends 
SqlParserAbstractSelfTest {
             "Failed to parse SQL statement \"set streaming off 
[*]allow_overwrite\": Unexpected token: " +
                 "\"ALLOW_OVERWRITE\"");
 
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off ordered",
+            "Failed to parse SQL statement \"set streaming off [*]ordered\": 
Unexpected token: " +
+                "\"ORDERED\"");
+
         assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off 
batch_size",
             "Failed to parse SQL statement \"set streaming off 
[*]batch_size\": Unexpected token: " +
                 "\"BATCH_SIZE\"");
@@ -102,6 +113,7 @@ public class SqlParserSetStreamingSelfTest extends 
SqlParserAbstractSelfTest {
 
         assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off table",
             "Failed to parse SQL statement \"set streaming off [*]table\": 
Unexpected token: \"TABLE\"");
+
     }
 
     /**
@@ -114,9 +126,10 @@ public class SqlParserSetStreamingSelfTest extends 
SqlParserAbstractSelfTest {
      * @param expParOps Expected per-node parallael operations.
      * @param expBufSize Expected per node buffer size.
      * @param expFlushFreq Expected flush frequency.
+     * @param ordered Ordered stream flag.
      */
     private static void parseValidate(String sql, boolean expOn, boolean 
expAllowOverwrite, int expBatchSize,
-        int expParOps, int expBufSize, long expFlushFreq) {
+        int expParOps, int expBufSize, long expFlushFreq, boolean ordered) {
         SqlSetStreamingCommand cmd = (SqlSetStreamingCommand)new 
SqlParser(QueryUtils.DFLT_SCHEMA, sql).nextCommand();
 
         assertEquals(expOn, cmd.isTurnOn());
@@ -130,5 +143,7 @@ public class SqlParserSetStreamingSelfTest extends 
SqlParserAbstractSelfTest {
         assertEquals(expBufSize, cmd.perNodeBufferSize());
 
         assertEquals(expFlushFreq, cmd.flushFrequency());
+
+        assertEquals(ordered, cmd.isOrdered());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7b1a093..2d019d3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1561,7 +1561,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
             if (on)
                 cliCtx.enableStreaming(setCmd.allowOverwrite(), 
setCmd.flushFrequency(),
-                    setCmd.perNodeBufferSize(), 
setCmd.perNodeParallelOperations());
+                    setCmd.perNodeBufferSize(), 
setCmd.perNodeParallelOperations(), setCmd.isOrdered());
             else
                 cliCtx.disableStreaming();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/yardstick/config/ignite-localhost-config.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/config/ignite-localhost-config.xml 
b/modules/yardstick/config/ignite-localhost-config.xml
index 3e57f4f..9b86850 100644
--- a/modules/yardstick/config/ignite-localhost-config.xml
+++ b/modules/yardstick/config/ignite-localhost-config.xml
@@ -27,8 +27,6 @@
     <import resource="ignite-base-config.xml"/>
 
     <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration" 
parent="base-ignite.cfg">
-        <property name="localHost" value="127.0.0.1"/>
-
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties
----------------------------------------------------------------------
diff --git 
a/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties 
b/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties
new file mode 100644
index 0000000..e0def4b
--- /dev/null
+++ b/modules/yardstick/config/upload/benchmark-jdbc-thin-streaming.properties
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Benchmarks for data upload in inmemory mode (persistence disabled).
+#
+
+now0=`date +'%H%M%S'`
+
+# JVM options.
+JVM_OPTS=${JVM_OPTS}" -DIGNITE_QUIET=false"
+
+# Uncomment to enable concurrent garbage collection (GC) if you encounter long 
GC pauses.
+JVM_OPTS=${JVM_OPTS}" \
+-Xms8g \
+-Xmx8g \
+-Xloggc:./gc${now0}.log \
+-XX:+PrintGCDetails \
+-verbose:gc \
+-XX:+UseParNewGC \
+-XX:+UseConcMarkSweepGC \
+-XX:+PrintGCDateStamps \
+"
+
+#Ignite version
+ver="RELEASE-"
+
+# List of default probes.
+# Add DStatProbe or VmStatProbe if your OS supports it (e.g. if running on 
Linux).
+BENCHMARK_DEFAULT_PROBES=TotalTimeProbe
+
+# Packages where the specified benchmark is searched by reflection mechanism.
+BENCHMARK_PACKAGES=org.yardstickframework,org.apache.ignite.yardstick
+
+# Flag which indicates to restart the servers before every benchmark execution.
+RESTART_SERVERS=true
+
+# Probe point writer class name.
+# BENCHMARK_WRITER=
+
+# The benchmark is applicable only for 2 servers (the second server is started 
in client mode) and 1 driver.
+SERVER_HOSTS=localhost,localhost
+DRIVER_HOSTS=localhost
+
+# Remote username.
+# REMOTE_USER=
+
+# Number of nodes, used to wait for the specified number of nodes to start.
+nodesNum=$((`echo ${SERVER_HOSTS} | tr ',' '\n' | wc -l` + `echo 
${DRIVER_HOSTS} | tr ',' '\n' | wc -l`))
+
+# Backups count.
+b=1
+
+# Warmup.
+w=0
+
+# Threads count.
+t=1
+
+# Sync mode.
+sm=FULL_SYNC
+
+
+# Run configuration which contains all benchmarks.
+# Note that each benchmark is set to run only one time, warmup parameter is 
set to 0 due to custom warmup operation.
+CONFIGS="\
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-256-order-on \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-local-batch-size 256 \
+, \
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-1024-order-on \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-local-batch-size 1024 \
+, \
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-4096-order-on \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-local-batch-size 4096 \
+, \
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-256-order-off \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-ordered false \
+  --streamer-local-batch-size 256 \
+, \
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-1024-order-off \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-ordered false \
+  --streamer-local-batch-size 1024 \
+, \
+-cfg ${SCRIPT_DIR}/../config/ignite-localhost-config.xml -nn ${nodesNum} -b 
${b} --warmup ${w} --operations 1 \
+  -jdbc jdbc:ignite:thin://auto.find/ \
+  --threads ${t} \
+  --syncMode ${sm} -dn InsertBenchmark -sn IgniteNode -ds 
${ver}sql-upload-inmemory-streaming-batch-4096-order-off \
+  --upload-rows 1000000 -cl --clientNodesAfterId 0 \
+  --use-streaming true \
+  --streamer-ordered false \
+  --streamer-local-batch-size 4096 \
+, \
+"

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java
----------------------------------------------------------------------
diff --git 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java
 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java
index 995201d..f6e8014 100644
--- 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java
+++ 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/StreamerParams.java
@@ -43,4 +43,9 @@ public interface StreamerParams {
      * @return Allow overwrite flag.
      */
     @Nullable public Boolean streamerAllowOverwrite();
-}
+
+    /**
+     * @return Ordered flag.
+     */
+    public boolean streamerOrdered();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java
----------------------------------------------------------------------
diff --git 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java
 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java
index 635ba6b..7a1b116 100644
--- 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java
+++ 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/UploadBenchmarkArguments.java
@@ -74,6 +74,10 @@ public class UploadBenchmarkArguments implements 
StreamerParams {
         description = "Streamer benchmarks only: set allowOverwrite streamer 
parameter.")
     private Boolean streamerAllowOverwrite = null;
 
+    @Parameter(names = {"--streamer-ordered"}, arity = 1,
+        description = "Streamer benchmarks only: set streamer ordered flag.")
+    private boolean streamerOrdered = true;
+
     /** How many rows to upload during warmup. */
     @Parameter(names = {"--upload-warmup-rows"})
     private long warmupRowsCnt = 3_000_000;
@@ -111,14 +115,14 @@ public class UploadBenchmarkArguments implements 
StreamerParams {
     /**
      * @return Value for {@link IgniteDataStreamer#perNodeBufferSize(int)}.
      */
-    @Nullable public Integer streamerPerNodeBufferSize() {
+    @Override @Nullable public Integer streamerPerNodeBufferSize() {
         return streamerNodeBufSize;
     }
 
     /**
      * @return Value for {@link 
IgniteDataStreamer#perNodeParallelOperations(int)}.
      */
-    @Nullable public Integer streamerPerNodeParallelOperations() {
+    @Override @Nullable public Integer streamerPerNodeParallelOperations() {
         return streamerNodeParOps;
     }
 
@@ -128,18 +132,25 @@ public class UploadBenchmarkArguments implements 
StreamerParams {
      * or set STREAMING sql command parameter. <br/>
      * If set to 1, {@link IgniteDataStreamer#addData(Object, Object)} method 
will be used.
      */
-    @Nullable public Integer streamerLocalBatchSize() {
+    @Override @Nullable public Integer streamerLocalBatchSize() {
         return streamerLocBatchSize;
     }
 
     /**
      * Bypass corresponding parameter to streamer.
      */
-    @Nullable public Boolean streamerAllowOverwrite() {
+    @Override @Nullable public Boolean streamerAllowOverwrite() {
         return streamerAllowOverwrite;
     }
 
     /**
+     * Bypass corresponding parameter to streamer.
+     */
+    @Override public boolean streamerOrdered() {
+        return streamerOrdered;
+    }
+
+    /**
      * See {@link #warmupRowsCnt}.
      */
     public long warmupRowsCnt() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d05c28ca/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java
 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java
index 87efa83..3ff4cb4 100644
--- 
a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java
+++ 
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/upload/model/QueryFactory.java
@@ -208,6 +208,8 @@ public class QueryFactory {
         if (p.streamerPerNodeBufferSize() != null)
             cmd.append(" PER_NODE_BUFFER_SIZE 
").append(p.streamerPerNodeBufferSize());
 
+        cmd.append(" ORDERED ").append(p.streamerOrdered() ? "ON" : "OFF");
+
         return cmd.append(';').toString();
     }
 }

Reply via email to