Repository: ignite
Updated Branches:
  refs/heads/ignite-2.7 ba7833c05 -> 9f8d331dc


http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h
new file mode 100644
index 0000000..701bb3c
--- /dev/null
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_ODBC_SQL_SQL_UTILS
+#define _IGNITE_ODBC_SQL_SQL_UTILS
+
+#include <string>
+
+#include <ignite/odbc/odbc_error.h>
+#include <ignite/odbc/sql/sql_token.h>
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace sql_utils
+        {
+            /**
+             * Parse token to boolean value.
+             *
+             * @return Boolean value.
+             */
+            inline OdbcExpected<bool> TokenToBoolean(const SqlToken& token)
+            {
+                std::string lower = token.ToLower();
+
+                if (lower == "1" || lower == "on")
+                    return true;
+
+                if (lower == "0" || lower == "off")
+                    return false;
+
+                return 
OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                    "Unexpected token: '" + token.ToString() + "', ON, OFF, 1 
or 0 expected.");
+            }
+
+            /**
+             * Check if the SQL is internal command.
+             *
+             * @param sql SQL request string.
+             * @return @c true if internal.
+             */
+            bool IsInternalCommand(const std::string& sql);
+        }
+    }
+}
+
+#endif //_IGNITE_ODBC_SQL_SQL_UTILS
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h
index 31d07de..56eea6c 100644
--- a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h
@@ -29,6 +29,7 @@
 #include "ignite/odbc/app/parameter_set.h"
 #include "ignite/odbc/diagnostic/diagnosable_adapter.h"
 #include "ignite/odbc/common_types.h"
+#include "sql/sql_set_streaming_command.h"
 
 namespace ignite
 {
@@ -454,6 +455,28 @@ namespace ignite
             SqlResult::Type InternalClose();
 
             /**
+             * Stop streaming.
+             *
+             * @return Operation result.
+             */
+            SqlResult::Type StopStreaming();
+
+            /**
+             * Process internal SQL command.
+             *
+             * @param query SQL query.
+             * @return Operation result.
+             */
+            SqlResult::Type ProcessInternalCommand(const std::string& query);
+
+            /**
+             * Check if the streaming is active currently.
+             *
+             * @return @c true, if the streaming is active.
+             */
+            bool IsStreamingActive() const;
+
+            /**
              * Prepare SQL query.
              *
              * @param query SQL query.
@@ -477,6 +500,13 @@ namespace ignite
             SqlResult::Type InternalExecuteSqlQuery();
 
             /**
+             * Process internal query.
+             *
+             * @return Operation result.
+             */
+            SqlResult::Type ProcessInternalQuery();
+
+            /**
              * Fetch query result row with offset
              * @param orientation Fetch type
              * @param offset Fetch offset

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h
new file mode 100644
index 0000000..752dd63
--- /dev/null
+++ b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_ODBC_STREAMING_STREAMING_BATCH
+#define _IGNITE_ODBC_STREAMING_STREAMING_BATCH
+
+#include <string>
+
+#include "ignite/impl/interop/interop_memory.h"
+
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace app
+        {
+            // Forward declaration.
+            class ParameterSet;
+        }
+        
+        namespace streaming
+        {
+            /**
+             * Streaming batch.
+             *
+             * Accumulates data for streaming.
+             */
+            class StreamingBatch
+            {
+            public:
+                /**
+                 * Default constructor.
+                 */
+                StreamingBatch();
+
+                /**
+                 * Destructor.
+                 */
+                ~StreamingBatch();
+
+                /**
+                 * Add another row to a batch.
+                 *
+                 * @param sql Sql.
+                 * @param params Parameters.
+                 */
+                void AddRow(const std::string& sql, const app::ParameterSet& 
params);
+
+                /**
+                 * Clear the batch data. 
+                 */
+                void Clear();
+
+                /**
+                 * Get data.
+                 *
+                 * @return Data.
+                 */
+                const int8_t* GetData() const
+                {
+                    return data.Data();
+                }
+
+                /**
+                 * Get data length.
+                 *
+                 * @return Data length.
+                 */
+                int32_t GetDataLength() const
+                {
+                    return data.Length();
+                }
+
+                /**
+                 * Get number of rows in batch.
+                 *
+                 * @return Number of rows in batch.
+                 */
+                int32_t GetSize() const
+                {
+                    return size;
+                }
+
+            private:
+                IGNITE_NO_COPY_ASSIGNMENT(StreamingBatch);
+
+                /** Current SQL. */
+                std::string currentSql;
+
+                /** Batch size in rows. */
+                int32_t size;
+
+                /** Batch data. */
+                impl::interop::InteropUnpooledMemory data;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_ODBC_STREAMING_STREAMING_BATCH

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h 
b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h
new file mode 100644
index 0000000..2852596
--- /dev/null
+++ 
b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_ODBC_STREAMING_STREAMING_CONTEXT
+#define _IGNITE_ODBC_STREAMING_STREAMING_CONTEXT
+
+#include "ignite/odbc/query/query.h"
+#include "ignite/odbc/app/parameter_set.h"
+
+#include "ignite/odbc/streaming/streaming_batch.h"
+
+namespace ignite
+{
+    namespace odbc
+    {
+        /** Set streaming forward-declaration. */
+        class SqlSetStreamingCommand;
+
+        /** Connection forward-declaration. */
+        class Connection;
+
+        namespace streaming
+        {
+            /**
+             * Streaming Query.
+             */
+            class StreamingContext
+            {
+            public:
+                /**
+                 * Default constructor.
+                 */
+                StreamingContext();
+
+                /**
+                 * Set connection for streaming.
+                 *
+                 * @param connection Connection for streaming.
+                 */
+                void SetConnection(Connection& connection)
+                {
+                    this->connection = &connection;
+                }
+
+                /**
+                 * Destructor.
+                 */
+                ~StreamingContext();
+
+                /**
+                 * Enable streaming.
+                 *
+                 * @param cmd Set streaming command.
+                 * @return Result.
+                 */
+                SqlResult::Type Enable(const SqlSetStreamingCommand& cmd);
+
+                /**
+                 * Disable streaming.
+                 *
+                 * @return Result.
+                 */
+                SqlResult::Type Disable();
+
+                /**
+                 * Check if the streaming is enabled.
+                 *
+                 * @return @c true if enabled.
+                 */
+                bool IsEnabled() const
+                {
+                    return enabled;
+                }
+
+                /**
+                 * Execute query.
+                 *
+                 * @param sql SQL.
+                 * @param params SQL params.
+                 * @return True on success.
+                 */
+                SqlResult::Type Execute(const std::string& sql, const 
app::ParameterSet& params);
+
+            private:
+                IGNITE_NO_COPY_ASSIGNMENT(StreamingContext);
+
+                /**
+                 * Flush collected streaming data to remote server.
+                 *
+                 * @param last Last page indicator.
+                 * @return Operation result.
+                 */
+                SqlResult::Type Flush(bool last);
+
+                /**
+                 * Send batch request.
+                 *
+                 * @param last Last page flag.
+                 * @return Result.
+                 */
+                SqlResult::Type MakeRequestStreamingBatch(bool last);
+
+                /** Connection associated with the statement. */
+                Connection* connection;
+
+                /** Batch size. */
+                int32_t batchSize;
+
+                /** Order. */
+                int64_t order;
+
+                /** Streaming enabled. */
+                bool enabled;
+
+                /** Current batch. */
+                StreamingBatch currentBatch;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_ODBC_STREAMING_STREAMING_CONTEXT

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp 
b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
index e3bbb4e..812d56f 100644
--- 
a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
+++ 
b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp
@@ -160,17 +160,17 @@ namespace ignite
 
                     const ProtocolVersion::VersionSet& supported = 
ProtocolVersion::GetSupported();
 
-                    ProtocolVersion version = ProtocolVersion::GetCurrent();
-                    ProtocolVersion::VersionSet::const_iterator it;
-                    for (it = supported.begin(); it != supported.end(); ++it)
+                    ProtocolVersion version = config.GetProtocolVersion();
+
+                    if (!version.IsSupported())
+                        version = ProtocolVersion::GetCurrent();
+
+                    for (ProtocolVersion::VersionSet::const_iterator it = 
supported.begin(); it != supported.end(); ++it)
                     {
                         protocolVersionComboBox->AddString(it->ToString());
 
-                        if (*it == config.GetProtocolVersion())
-                        {
+                        if (*it == version)
                             protocolVersionComboBox->SetSelection(id);
-                            version = *it;
-                        }
 
                         ++id;
                     }
@@ -296,7 +296,10 @@ namespace ignite
 
                     int checkBoxSize = (sizeX - 3 * INTERVAL) / 2;
 
-                    const ProtocolVersion version = 
config.GetProtocolVersion();
+                    ProtocolVersion version = config.GetProtocolVersion();
+
+                    if (!version.IsSupported())
+                        version = ProtocolVersion::GetCurrent();
 
                     int rowPos = posY + 2 * INTERVAL;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj 
b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
index 630dc27..69318ef 100644
--- a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
+++ b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj
@@ -191,14 +191,21 @@
     <ClCompile Include="..\..\src\query\foreign_keys_query.cpp" />
     <ClCompile Include="..\..\src\query\primary_keys_query.cpp" />
     <ClCompile Include="..\..\src\query\special_columns_query.cpp" />
+    <ClCompile Include="..\..\src\query\streaming_query.cpp" />
     <ClCompile Include="..\..\src\query\table_metadata_query.cpp" />
     <ClCompile Include="..\..\src\query\type_info_query.cpp" />
     <ClCompile Include="..\..\src\result_page.cpp" />
     <ClCompile Include="..\..\src\row.cpp" />
+    <ClCompile Include="..\..\src\sql\sql_lexer.cpp" />
+    <ClCompile Include="..\..\src\sql\sql_parser.cpp" />
+    <ClCompile Include="..\..\src\sql\sql_set_streaming_command.cpp" />
+    <ClCompile Include="..\..\src\sql\sql_utils.cpp" />
     <ClCompile Include="..\..\src\ssl\secure_socket_client.cpp" />
     <ClCompile Include="..\..\src\ssl\ssl_gateway.cpp" />
     <ClCompile Include="..\..\src\ssl\ssl_mode.cpp" />
     <ClCompile Include="..\..\src\statement.cpp" />
+    <ClCompile Include="..\..\src\streaming\streaming_batch.cpp" />
+    <ClCompile Include="..\..\src\streaming\streaming_context.cpp" />
     <ClCompile Include="..\..\src\type_traits.cpp" />
     <ClCompile Include="..\..\src\utility.cpp" />
     <ClCompile Include="..\..\src\log.cpp" />
@@ -239,19 +246,29 @@
     <ClInclude Include="..\..\include\ignite\odbc\query\data_query.h" />
     <ClInclude 
Include="..\..\include\ignite\odbc\query\column_metadata_query.h" />
     <ClInclude Include="..\..\include\ignite\odbc\query\foreign_keys_query.h" 
/>
+    <ClInclude Include="..\..\include\ignite\odbc\query\internal_query.h" />
     <ClInclude Include="..\..\include\ignite\odbc\query\primary_keys_query.h" 
/>
     <ClInclude Include="..\..\include\ignite\odbc\query\query.h" />
     <ClInclude 
Include="..\..\include\ignite\odbc\query\special_columns_query.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\query\streaming_query.h" />
     <ClInclude 
Include="..\..\include\ignite\odbc\query\table_metadata_query.h" />
     <ClInclude Include="..\..\include\ignite\odbc\query\type_info_query.h" />
     <ClInclude Include="..\..\include\ignite\odbc\result_page.h" />
     <ClInclude Include="..\..\include\ignite\odbc\row.h" />
     <ClInclude Include="..\..\include\ignite\odbc\socket_client.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_command.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_lexer.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_parser.h" />
+    <ClInclude 
Include="..\..\include\ignite\odbc\sql\sql_set_streaming_command.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_token.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_utils.h" />
     <ClInclude Include="..\..\include\ignite\odbc\ssl\secure_socket_client.h" 
/>
     <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_bindings.h" />
     <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_gateway.h" />
     <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_mode.h" />
     <ClInclude Include="..\..\include\ignite\odbc\statement.h" />
+    <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_batch.h" 
/>
+    <ClInclude 
Include="..\..\include\ignite\odbc\streaming\streaming_context.h" />
     <ClInclude Include="..\..\include\ignite\odbc\system\odbc_constants.h" />
     <ClInclude Include="..\..\include\ignite\odbc\system\tcp_socket_client.h" 
/>
     <ClInclude 
Include="..\..\include\ignite\odbc\system\ui\dsn_configuration_window.h" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters 
b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters
index 6da0111..f0e49b5 100644
--- a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters
+++ b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters
@@ -32,6 +32,12 @@
     <Filter Include="Code\ssl">
       
<UniqueIdentifier>{857734a3-6b29-412a-b75e-7fcc9d3fef8c}</UniqueIdentifier>
     </Filter>
+    <Filter Include="Code\sql">
+      
<UniqueIdentifier>{7f412a6b-279a-4a22-b87f-eafb1545a8d0}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="Code\streaming">
+      
<UniqueIdentifier>{a14eb935-4a95-4202-a83b-12e715cf1cf1}</UniqueIdentifier>
+    </Filter>
   </ItemGroup>
   <ItemGroup>
     <ClCompile Include="..\..\src\odbc.cpp">
@@ -169,6 +175,27 @@
     <ClCompile Include="..\..\src\nested_tx_mode.cpp">
       <Filter>Code</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\sql\sql_lexer.cpp">
+      <Filter>Code\sql</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\sql\sql_parser.cpp">
+      <Filter>Code\sql</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\sql\sql_set_streaming_command.cpp">
+      <Filter>Code\sql</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\sql\sql_utils.cpp">
+      <Filter>Code\sql</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\streaming\streaming_batch.cpp">
+      <Filter>Code\streaming</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\query\streaming_query.cpp">
+      <Filter>Code\query</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\streaming\streaming_context.cpp">
+      <Filter>Code\streaming</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <None Include="module.def">
@@ -335,5 +362,35 @@
     <ClInclude Include="..\..\include\ignite\odbc\nested_tx_mode.h">
       <Filter>Code</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_lexer.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_parser.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_token.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude 
Include="..\..\include\ignite\odbc\sql\sql_set_streaming_command.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_command.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\sql\sql_utils.h">
+      <Filter>Code\sql</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\query\internal_query.h">
+      <Filter>Code\query</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_batch.h">
+      <Filter>Code\streaming</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\odbc\query\streaming_query.h">
+      <Filter>Code\query</Filter>
+    </ClInclude>
+    <ClInclude 
Include="..\..\include\ignite\odbc\streaming\streaming_context.h">
+      <Filter>Code\streaming</Filter>
+    </ClInclude>
   </ItemGroup>
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp 
b/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp
index 7f7c2f4..17165e2 100644
--- a/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp
+++ b/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp
@@ -65,7 +65,7 @@ namespace ignite
                 // No-op.
             }
 
-            void ConnectionStringParser::ParseConnectionString(const char* 
str, size_t len, char delimeter,
+            void ConnectionStringParser::ParseConnectionString(const char* 
str, size_t len, char delimiter,
                 diagnostic::DiagnosticRecordStorage* diag)
             {
                 std::string connect_str(str, len);
@@ -75,7 +75,7 @@ namespace ignite
 
                 while (!connect_str.empty())
                 {
-                    size_t attr_begin = connect_str.rfind(delimeter);
+                    size_t attr_begin = connect_str.rfind(delimiter);
 
                     if (attr_begin == std::string::npos)
                         attr_begin = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/connection.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/connection.cpp 
b/modules/platforms/cpp/odbc/src/connection.cpp
index 5d01a0d..b580f12 100644
--- a/modules/platforms/cpp/odbc/src/connection.cpp
+++ b/modules/platforms/cpp/odbc/src/connection.cpp
@@ -63,9 +63,10 @@ namespace ignite
             autoCommit(true),
             parser(),
             config(),
-            info(config)
+            info(config),
+            streamingContext()
         {
-            // No-op.
+            streamingContext.SetConnection(*this);
         }
 
         Connection::~Connection()
@@ -375,7 +376,7 @@ namespace ignite
             return config;
         }
 
-        bool Connection::IsAutoCommit()
+        bool Connection::IsAutoCommit() const
         {
             return autoCommit;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/message.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/message.cpp 
b/modules/platforms/cpp/odbc/src/message.cpp
index 5730842..587d253 100644
--- a/modules/platforms/cpp/odbc/src/message.cpp
+++ b/modules/platforms/cpp/odbc/src/message.cpp
@@ -18,6 +18,8 @@
 #include "ignite/odbc/message.h"
 #include "ignite/odbc/utility.h"
 
+#include "ignite/odbc/streaming/streaming_batch.h"
+
 namespace
 {
     using namespace ignite;
@@ -123,7 +125,7 @@ namespace ignite
                 writer.WriteBool(autoCommit);
         }
 
-        QueryExecuteBatchtRequest::QueryExecuteBatchtRequest(const 
std::string& schema, const std::string& sql,
+        QueryExecuteBatchRequest::QueryExecuteBatchRequest(const std::string& 
schema, const std::string& sql,
             const app::ParameterSet& params, SqlUlen begin, SqlUlen end, bool 
last, int32_t timeout, bool autoCommit) :
             schema(schema),
             sql(sql),
@@ -137,12 +139,12 @@ namespace ignite
             // No-op.
         }
 
-        QueryExecuteBatchtRequest::~QueryExecuteBatchtRequest()
+        QueryExecuteBatchRequest::~QueryExecuteBatchRequest()
         {
             // No-op.
         }
 
-        void QueryExecuteBatchtRequest::Write(impl::binary::BinaryWriterImpl& 
writer, const ProtocolVersion& ver) const
+        void QueryExecuteBatchRequest::Write(impl::binary::BinaryWriterImpl& 
writer, const ProtocolVersion& ver) const
         {
             writer.WriteInt8(RequestType::EXECUTE_SQL_QUERY_BATCH);
 
@@ -262,6 +264,38 @@ namespace ignite
             writer.WriteInt32(pageSize);
         }
 
+        StreamingBatchRequest::StreamingBatchRequest(const std::string& schema,
+            const streaming::StreamingBatch& batch, bool last, int64_t order) :
+            schema(schema),
+            batch(batch),
+            last(last),
+            order(order)
+        {
+            // No-op.
+        }
+
+        StreamingBatchRequest::~StreamingBatchRequest()
+        {
+            // No-op.
+        }
+
+        void StreamingBatchRequest::Write(impl::binary::BinaryWriterImpl& 
writer, const ProtocolVersion&) const
+        {
+            writer.WriteInt8(RequestType::STREAMING_BATCH);
+
+            writer.WriteString(schema);
+
+            impl::interop::InteropOutputStream* stream = writer.GetStream();
+
+            writer.WriteInt32(batch.GetSize());
+
+            if (batch.GetSize() != 0)
+                stream->WriteInt8Array(batch.GetData(), batch.GetDataLength());
+
+            writer.WriteBool(last);
+            writer.WriteInt64(order);
+        }
+
         Response::Response() :
             status(ResponseStatus::UNKNOWN_ERROR),
             error()
@@ -358,7 +392,7 @@ namespace ignite
             ReadAffectedRows(reader, ver, affectedRows);
         }
 
-        QueryExecuteBatchResponse::QueryExecuteBatchResponse():
+        QueryExecuteBatchResponse::QueryExecuteBatchResponse() :
             affectedRows(0),
             errorMessage(),
             errorCode(1)
@@ -388,6 +422,26 @@ namespace ignite
             }
         }
 
+        StreamingBatchResponse::StreamingBatchResponse() :
+            errorMessage(),
+            errorCode(ResponseStatus::SUCCESS),
+            order(0)
+        {
+            // No-op.
+        }
+
+        StreamingBatchResponse::~StreamingBatchResponse()
+        {
+            // No-op.
+        }
+
+        void 
StreamingBatchResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, 
const ProtocolVersion&)
+        {
+            errorMessage = reader.ReadObject<std::string>();
+            errorCode = reader.ReadInt32();
+            order = reader.ReadInt64();
+        }
+
         QueryFetchResponse::QueryFetchResponse(ResultPage& resultPage) :
             queryId(0),
             resultPage(resultPage)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/query/batch_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp 
b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
index 1256c94..741875a 100644
--- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp
+++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp
@@ -144,9 +144,8 @@ namespace ignite
             {
                 const std::string& schema = connection.GetSchema();
 
-                QueryExecuteBatchtRequest req(schema, sql, params, begin, end, 
last, timeout,
+                QueryExecuteBatchRequest req(schema, sql, params, begin, end, 
last, timeout,
                     connection.IsAutoCommit());
-
                 QueryExecuteBatchResponse rsp;
 
                 try

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/query/streaming_query.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/query/streaming_query.cpp 
b/modules/platforms/cpp/odbc/src/query/streaming_query.cpp
new file mode 100644
index 0000000..dd9302f
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/query/streaming_query.cpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/odbc/connection.h"
+#include "ignite/odbc/message.h"
+#include "ignite/odbc/log.h"
+#include "ignite/odbc/query/streaming_query.h"
+#include "ignite/odbc/sql/sql_set_streaming_command.h"
+
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace query
+        {
+            StreamingQuery::StreamingQuery(
+                diagnostic::Diagnosable& diag,
+                Connection& connection,
+                const app::ParameterSet& params) :
+                Query(diag, QueryType::STREAMING),
+                connection(connection),
+                params(params)
+            {
+                // No-op.
+            }
+
+            StreamingQuery::~StreamingQuery()
+            {
+                // No-op.
+            }
+
+            SqlResult::Type StreamingQuery::Execute()
+            {
+                return connection.GetStreamingContext().Execute(sql, params);
+            }
+
+            const meta::ColumnMetaVector& StreamingQuery::GetMeta() const
+            {
+                static meta::ColumnMetaVector empty;
+
+                return empty;
+            }
+
+            SqlResult::Type 
StreamingQuery::FetchNextRow(app::ColumnBindingMap&)
+            {
+                return SqlResult::AI_NO_DATA;
+            }
+
+            SqlResult::Type StreamingQuery::GetColumn(uint16_t, 
app::ApplicationDataBuffer&)
+            {
+                diag.AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, 
"Column is not available.");
+
+                return SqlResult::AI_ERROR;
+            }
+
+            SqlResult::Type StreamingQuery::Close()
+            {
+                return SqlResult::AI_SUCCESS;
+            }
+
+            bool StreamingQuery::DataAvailable() const
+            {
+                return false;
+            }
+
+            int64_t StreamingQuery::AffectedRows() const
+            {
+                return 0;
+            }
+
+            SqlResult::Type StreamingQuery::NextResultSet()
+            {
+                return SqlResult::AI_NO_DATA;
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp 
b/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp
new file mode 100644
index 0000000..5422bc6
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cctype>
+
+#include "ignite/odbc/odbc_error.h"
+#include "ignite/odbc/sql/sql_lexer.h"
+
+
+namespace ignite
+{
+    namespace odbc
+    {
+        SqlLexer::SqlLexer(const std::string& sql) :
+            sql(sql),
+            pos(0),
+            currentToken(0, 0, TokenType::EOD)
+        {
+            // No-op.
+        }
+
+        SqlLexer::~SqlLexer()
+        {
+            // No-op.
+        }
+
+        OdbcExpected<bool> SqlLexer::Shift()
+        {
+            if (IsEod())
+            {
+                SetEod();
+
+                return false;
+            }
+
+            TokenType::Type tokenType = TokenType::EOD;
+
+            while (!IsEod())
+            {
+                int32_t tokenBegin = pos;
+
+                switch (sql[pos])
+                {
+                    case '-':
+                    {
+                        // Full-line comment.
+                        if (HaveData(1) && sql[pos + 1] == '-')
+                        {
+                            pos += 2;
+
+                            while (!IsEod() && sql[pos] != '\n' && sql[pos] != 
'\r')
+                                ++pos;
+
+                            continue;
+                        }
+
+                        // Minus.
+                        tokenType = TokenType::MINUS;
+
+                        break;
+                    }
+
+                    case '"':
+                    {
+                        // Quoted string.
+                        while (true)
+                        {
+                            ++pos;
+
+                            if (IsEod())
+                                return 
OdbcError(SqlState::SHY000_GENERAL_ERROR, "Unclosed quoted identifier.");
+
+                            if (sql[pos] == '"')
+                            {
+                                if (!HaveData(2) || sql[pos + 1] != '"')
+                                    break;
+
+                                ++pos;
+                            }
+                        }
+
+                        tokenType = TokenType::QUOTED;
+
+                        break;
+                    }
+
+                    case '\'':
+                    {
+                        // String literal.
+                        while (true)
+                        {
+                            ++pos;
+
+                            if (IsEod())
+                                return 
OdbcError(SqlState::SHY000_GENERAL_ERROR, "Unclosed string literal.");
+
+                            if (sql[pos] == '\'')
+                            {
+                                if (!HaveData(2) || sql[pos + 1] != '\'')
+                                    break;
+
+                                ++pos;
+                            }
+                        }
+
+                        tokenType = TokenType::STRING;
+
+                        break;
+                    }
+
+                    case '.':
+                    {
+                        tokenType = TokenType::DOT;
+
+                        break;
+                    }
+
+                    case ',':
+                    {
+                        tokenType = TokenType::COMMA;
+
+                        break;
+                    }
+
+                    case ';':
+                    {
+                        tokenType = TokenType::SEMICOLON;
+
+                        break;
+                    }
+
+                    case '(':
+                    {
+                        tokenType = TokenType::PARENTHESIS_LEFT;
+
+                        break;
+                    }
+
+                    case ')':
+                    {
+                        tokenType = TokenType::PARENTHESIS_RIGHT;
+
+                        break;
+                    }
+
+                    default:
+                    {
+                        // Skipping spaces.
+                        if (iscntrl(sql[pos]) || isspace(sql[pos]))
+                        {
+                            do
+                            {
+                                ++pos;
+                            }
+                            while (!IsEod() && (iscntrl(sql[pos]) || 
isspace(sql[pos])));
+
+                            continue;
+                        }
+
+                        // Word.
+                        while (!IsEod() && !IsDelimiter(sql[pos]))
+                            ++pos;
+
+                        --pos;
+
+                        tokenType = TokenType::WORD;
+
+                        break;
+                    }
+                }
+
+                ++pos;
+
+                if (tokenType != TokenType::EOD)
+                {
+                    currentToken = SqlToken(&sql[tokenBegin], pos - 
tokenBegin, tokenType);
+
+                    return true;
+                }
+            }
+
+            SetEod();
+
+            return false;
+        }
+
+        bool SqlLexer::ExpectNextToken(TokenType::Type typ, const char* 
expected)
+        {
+            OdbcExpected<bool> hasNext = Shift();
+
+            if (!hasNext.IsOk() || !*hasNext)
+                return false;
+
+            const SqlToken& token = GetCurrentToken();
+
+            return token.GetType() == typ && token.ToLower() == expected;
+        }
+
+        bool SqlLexer::IsEod() const
+        {
+            return pos >= static_cast<int32_t>(sql.size());
+        }
+
+        void SqlLexer::SetEod()
+        {
+            pos = static_cast<int32_t>(sql.size());
+
+            currentToken = SqlToken(0, 0, TokenType::EOD);
+        }
+
+        bool SqlLexer::HaveData(int32_t num) const
+        {
+            return static_cast<size_t>(pos + num) < sql.size();
+        }
+
+        bool SqlLexer::IsDelimiter(int c)
+        {
+            return !isalnum(c) && c != '_';
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp 
b/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp
new file mode 100644
index 0000000..d6b5d7d
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/common/utils.h>
+
+#include <ignite/odbc/odbc_error.h>
+#include <ignite/odbc/sql/sql_set_streaming_command.h>
+#include <ignite/odbc/sql/sql_parser.h>
+
+const static std::string WORD_SET("set");
+
+const static std::string WORD_STREAMING("streaming");
+
+namespace ignite
+{
+    namespace odbc
+    {
+        SqlParser::SqlParser(const std::string& sql) :
+            lexer(sql)
+        {
+            // No-op.
+        }
+
+        SqlParser::~SqlParser()
+        {
+            // No-op.
+        }
+
+        std::auto_ptr<SqlCommand> SqlParser::GetNextCommand()
+        {
+            while (true)
+            {
+                if (!*lexer.Shift())
+                    return std::auto_ptr<SqlCommand>();
+
+                const SqlToken& token = lexer.GetCurrentToken();
+
+                switch (token.GetType())
+                {
+                    case TokenType::SEMICOLON:
+                        // Empty command. Skipping.
+                        continue;
+
+                    case TokenType::WORD:
+                        return ProcessCommand();
+
+                    case TokenType::QUOTED:
+                    case TokenType::MINUS:
+                    case TokenType::DOT:
+                    case TokenType::COMMA:
+                    case TokenType::PARENTHESIS_LEFT:
+                    case TokenType::PARENTHESIS_RIGHT:
+                    default:
+                    {
+                        throw 
OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                            "Unexpected token: '" + token.ToString() + "'");
+                    }
+                }
+            }
+        }
+
+        std::auto_ptr<SqlCommand> SqlParser::ProcessCommand()
+        {
+            const SqlToken& token = lexer.GetCurrentToken();
+
+            if (WORD_SET == token.ToLower() &&
+                *lexer.Shift() &&
+                token.GetType() == TokenType::WORD &&
+                WORD_STREAMING == token.ToLower())
+            {
+                std::auto_ptr<SqlCommand> cmd(new SqlSetStreamingCommand);
+
+                cmd->Parse(lexer);
+
+                return cmd;
+            }
+
+            throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                "Unexpected token: '" + token.ToString() + "'");
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp 
b/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp
new file mode 100644
index 0000000..5047917
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/common/utils.h>
+
+#include <ignite/odbc/odbc_error.h>
+#include <ignite/odbc/sql/sql_lexer.h>
+#include <ignite/odbc/sql/sql_token.h>
+#include <ignite/odbc/sql/sql_utils.h>
+#include <ignite/odbc/sql/sql_set_streaming_command.h>
+
+const static std::string WORD_BATCH_SIZE("batch_size");
+
+const static std::string WORD_PER_NODE_BUFFER_SIZE("per_node_buffer_size");
+
+const static std::string 
WORD_PER_NODE_PARALLEL_OPERATIONS("per_node_parallel_operations");
+
+const static std::string WORD_ALLOW_OVERWRITE("allow_overwrite");
+
+const static std::string WORD_FLUSH_FREQUENCY("flush_frequency");
+
+const static std::string WORD_ORDERED("ordered");
+
+namespace ignite
+{
+    namespace odbc
+    {
+        SqlSetStreamingCommand::SqlSetStreamingCommand() :
+            SqlCommand(SqlCommandType::SET_STREAMING),
+            enabled(false),
+            allowOverwrite(false),
+            batchSize(DEFAULT_STREAM_BATCH_SIZE),
+            parallelOpsPerNode(0),
+            bufferSizePerNode(0),
+            flushFrequency(0),
+            ordered(false)
+        {
+            // No-op.
+        }
+
+        SqlSetStreamingCommand::~SqlSetStreamingCommand()
+        {
+            // No-op.
+        }
+
+        void SqlSetStreamingCommand::Parse(SqlLexer& lexer)
+        {
+            enabled = ExpectBool(lexer);
+
+            const SqlToken& token = lexer.GetCurrentToken();
+
+            while (*lexer.Shift() && token.GetType() != TokenType::SEMICOLON)
+            {
+                if (token.ToLower() == WORD_BATCH_SIZE)
+                {
+                    CheckEnabled(token);
+
+                    batchSize = ExpectPositiveInteger(lexer, "batch size");
+
+                    continue;
+                }
+
+                if (token.ToLower() == WORD_PER_NODE_BUFFER_SIZE)
+                {
+                    CheckEnabled(token);
+
+                    bufferSizePerNode = ExpectPositiveInteger(lexer, "per node 
buffer size");
+
+                    continue;
+                }
+
+                if (token.ToLower() == WORD_PER_NODE_PARALLEL_OPERATIONS)
+                {
+                    CheckEnabled(token);
+
+                    parallelOpsPerNode = ExpectPositiveInteger(lexer, "per 
node parallel operations number");
+
+                    continue;
+                }
+
+                if (token.ToLower() == WORD_ALLOW_OVERWRITE)
+                {
+                    CheckEnabled(token);
+
+                    allowOverwrite = ExpectBool(lexer);
+
+                    continue;
+                }
+
+                if (token.ToLower() == WORD_FLUSH_FREQUENCY)
+                {
+                    CheckEnabled(token);
+
+                    flushFrequency = ExpectPositiveInteger(lexer, "flush 
frequency");
+
+                    continue;
+                }
+
+                if (token.ToLower() == WORD_ORDERED)
+                {
+                    CheckEnabled(token);
+
+                    ordered = true;
+
+                    continue;
+                }
+
+                ThrowUnexpectedTokenError(token, "additional parameter of SET 
STREAMING command or semicolon");
+            }
+        }
+
+        void SqlSetStreamingCommand::CheckEnabled(const SqlToken& token) const
+        {
+            if (!enabled)
+                ThrowUnexpectedTokenError(token, "no parameters with STREAMING 
OFF command");
+        }
+
+        void SqlSetStreamingCommand::ThrowUnexpectedTokenError(const SqlToken& 
token, const std::string& expected)
+        {
+            throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                "Unexpected token: '" + token.ToString() + "', " + expected + 
" expected.");
+        }
+
+        void SqlSetStreamingCommand::ThrowUnexpectedEndOfStatement(const 
std::string& expected)
+        {
+            throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                "Unexpected end of statement: " + expected + " expected.");
+        }
+
+        int32_t SqlSetStreamingCommand::ExpectInt(SqlLexer& lexer)
+        {
+            if (!*lexer.Shift())
+                ThrowUnexpectedEndOfStatement("integer number");
+
+            const SqlToken& token = lexer.GetCurrentToken();
+
+            int sign = 1;
+
+            if (token.GetType() == TokenType::MINUS)
+            {
+                sign = -1;
+
+                if (!*lexer.Shift())
+                    ThrowUnexpectedEndOfStatement("integer number");
+            }
+
+            std::string str = token.ToString();
+
+            if (token.GetType() == TokenType::WORD &&
+                common::AllOf(str.begin(), str.end(), isdigit))
+            {
+                int64_t val = sign * common::LexicalCast<int64_t>(str);
+
+                if (val >= INT32_MIN && val <= INT32_MAX)
+                    return static_cast<int32_t>(val);
+            }
+
+            ThrowUnexpectedTokenError(token, "integer number");
+
+            return 0;
+        }
+
+        int32_t SqlSetStreamingCommand::ExpectPositiveInteger(SqlLexer& lexer, 
const std::string& description)
+        {
+            int32_t val = ExpectInt(lexer);
+
+            if (val <= 0)
+            {
+                throw 
OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION,
+                    "Invalid " + description + " - positive integer number is 
expected.");
+            }
+
+            return val;
+        }
+
+        bool SqlSetStreamingCommand::ExpectBool(SqlLexer& lexer)
+        {
+            const SqlToken& token = lexer.GetCurrentToken();
+
+            if (!*lexer.Shift())
+                ThrowUnexpectedTokenError(token, "ON or OFF");
+
+            return *sql_utils::TokenToBoolean(token);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp 
b/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp
new file mode 100644
index 0000000..2e7066f
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/odbc/odbc_error.h>
+
+#include <ignite/odbc/sql/sql_lexer.h>
+#include <ignite/odbc/sql/sql_utils.h>
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace sql_utils
+        {
+            bool IsInternalCommand(const std::string& sql)
+            {
+                SqlLexer lexer(sql);
+
+                while (true)
+                {
+                    OdbcExpected<bool> hasNext = lexer.Shift();
+
+                    if (!hasNext.IsOk() || !*hasNext)
+                        return false;
+
+                    const SqlToken& token = lexer.GetCurrentToken();
+
+                    if (token.GetType() != TokenType::SEMICOLON)
+                    {
+                        if (token.GetType() == TokenType::WORD && 
token.ToLower() == "set")
+                            break;
+
+                        return false;
+                    }
+                }
+
+                return lexer.ExpectNextToken(TokenType::WORD, "streaming");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/statement.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/statement.cpp 
b/modules/platforms/cpp/odbc/src/statement.cpp
index fd2d4ec..8ba0902 100644
--- a/modules/platforms/cpp/odbc/src/statement.cpp
+++ b/modules/platforms/cpp/odbc/src/statement.cpp
@@ -26,12 +26,17 @@
 #include "ignite/odbc/query/primary_keys_query.h"
 #include "ignite/odbc/query/type_info_query.h"
 #include "ignite/odbc/query/special_columns_query.h"
+#include "ignite/odbc/query/streaming_query.h"
+#include "ignite/odbc/query/internal_query.h"
 #include "ignite/odbc/connection.h"
 #include "ignite/odbc/utility.h"
 #include "ignite/odbc/message.h"
 #include "ignite/odbc/statement.h"
 #include "ignite/odbc/log.h"
 #include "ignite/odbc/odbc_error.h"
+#include "ignite/odbc/sql/sql_utils.h"
+#include "ignite/odbc/sql/sql_parser.h"
+#include "ignite/odbc/sql/sql_set_streaming_command.h"
 
 namespace ignite
 {
@@ -281,7 +286,17 @@ namespace ignite
 
                 case SQL_ATTR_PARAMSET_SIZE:
                 {
-                    
parameters.SetParamSetSize(reinterpret_cast<SqlUlen>(value));
+                    SqlUlen size = reinterpret_cast<SqlUlen>(value);
+
+                    if (size > 1 && IsStreamingActive())
+                    {
+                        
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
+                            "Batching is not supported in streaming mode.");
+
+                        return SqlResult::AI_ERROR;
+                    }
+
+                    parameters.SetParamSetSize(size);
 
                     break;
                 }
@@ -558,14 +573,58 @@ namespace ignite
             IGNITE_ODBC_API_CALL(InternalPrepareSqlQuery(query));
         }
 
+        SqlResult::Type Statement::ProcessInternalCommand(const std::string& 
query)
+        {
+            try
+            {
+                SqlParser parser(query);
+
+                std::auto_ptr<SqlCommand> cmd = parser.GetNextCommand();
+
+                assert(cmd.get() != 0);
+
+                parameters.Prepare();
+
+                currentQuery.reset(new query::InternalQuery(*this, query, 
cmd));
+
+                return SqlResult::AI_SUCCESS;
+            }
+            catch (const OdbcError& err)
+            {
+                AddStatusRecord(err);
+
+                return SqlResult::AI_ERROR;
+            }
+        }
+
+        bool Statement::IsStreamingActive() const
+        {
+            return connection.GetStreamingContext().IsEnabled();
+        }
+
         SqlResult::Type Statement::InternalPrepareSqlQuery(const std::string& 
query)
         {
-            if (currentQuery.get())
-                currentQuery->Close();
+            if (sql_utils::IsInternalCommand(query))
+                return ProcessInternalCommand(query);
 
             // Resetting parameters types as we are changing the query.
             parameters.Prepare();
 
+            if (IsStreamingActive())
+            {
+                if (!currentQuery.get())
+                    currentQuery.reset(new query::StreamingQuery(*this, 
connection, parameters));
+
+                query::StreamingQuery* currentQuery0 = 
static_cast<query::StreamingQuery*>(currentQuery.get());
+
+                currentQuery0->PrepareQuery(query);
+
+                return SqlResult::AI_SUCCESS;
+            }
+
+            if (currentQuery.get())
+                currentQuery->Close();
+
             currentQuery.reset(new query::DataQuery(*this, connection, query, 
parameters, timeout));
 
             return SqlResult::AI_SUCCESS;
@@ -600,6 +659,13 @@ namespace ignite
                 return SqlResult::AI_ERROR;
             }
 
+            if (currentQuery->GetType() == query::QueryType::INTERNAL)
+            {
+                ProcessInternalQuery();
+
+                return SqlResult::AI_SUCCESS;
+            }
+
             if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == 
query::QueryType::DATA)
             {
                 query::DataQuery& qry = 
static_cast<query::DataQuery&>(*currentQuery);
@@ -613,12 +679,21 @@ namespace ignite
                 currentQuery.reset(new query::DataQuery(*this, connection, 
qry.GetSql(), parameters, timeout));
             }
 
+            if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == 
query::QueryType::STREAMING)
+            {
+                
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
+                    "Batching is not supported in streaming mode.");
+
+                return SqlResult::AI_ERROR;
+            }
+
             if (parameters.IsDataAtExecNeeded())
             {
-                if (currentQuery->GetType() == query::QueryType::BATCH)
+                if (currentQuery->GetType() == query::QueryType::BATCH ||
+                    currentQuery->GetType() == query::QueryType::STREAMING)
                 {
                     
AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
-                        "Data-at-execution is not supported together with 
batching.");
+                        "Data-at-execution is not supported with batching.");
 
                     return SqlResult::AI_ERROR;
                 }
@@ -629,6 +704,43 @@ namespace ignite
             return currentQuery->Execute();
         }
 
+        SqlResult::Type Statement::ProcessInternalQuery()
+        {
+            assert(currentQuery->GetType() == query::QueryType::INTERNAL);
+
+            query::InternalQuery* qry = 
static_cast<query::InternalQuery*>(currentQuery.get());
+            
+            LOG_MSG("Processing internal query: " << qry->GetQuery());
+
+            assert(qry->GetCommand().GetType() == 
SqlCommandType::SET_STREAMING);
+
+            SqlSetStreamingCommand& cmd = 
static_cast<SqlSetStreamingCommand&>(qry->GetCommand());
+
+            StopStreaming();
+
+            if (!cmd.IsEnabled())
+                return SqlResult::AI_SUCCESS;
+
+            LOG_MSG("Sending start streaming command");
+
+            query::DataQuery enablingQuery(*this, connection, qry->GetQuery(), 
parameters, timeout);
+
+            SqlResult::Type res = enablingQuery.Execute();
+
+            if (res != SqlResult::AI_SUCCESS)
+                return res;
+
+            LOG_MSG("Preparing streaming context on client");
+
+            connection.GetStreamingContext().Enable(cmd);
+
+            std::auto_ptr<query::Query> newQry(new 
query::StreamingQuery(*this, connection, parameters));
+
+            std::swap(currentQuery, newQry);
+
+            return SqlResult::AI_SUCCESS;
+        }
+
         void Statement::ExecuteGetColumnsMetaQuery(const std::string& schema,
             const std::string& table, const std::string& column)
         {
@@ -825,6 +937,18 @@ namespace ignite
             return result;
         }
 
+        SqlResult::Type Statement::StopStreaming()
+        {
+            if (!IsStreamingActive())
+                return SqlResult::AI_SUCCESS;
+
+            LOG_MSG("Stopping streaming");
+
+            SqlResult::Type result = 
connection.GetStreamingContext().Disable();
+
+            return result;
+        }
+
         void Statement::FetchScroll(int16_t orientation, int64_t offset)
         {
             IGNITE_ODBC_API_CALL(InternalFetchScroll(orientation, offset));

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp 
b/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp
new file mode 100644
index 0000000..2cdb281
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/interop/interop_output_stream.h"
+#include "ignite/impl/binary/binary_writer_impl.h"
+
+#include "ignite/odbc/app/parameter_set.h"
+#include "ignite/odbc/streaming/streaming_batch.h"
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace streaming
+        {
+            StreamingBatch::StreamingBatch() :
+                currentSql(),
+                size(0),
+                data(1024 * 16)
+            {
+                // No-op.
+            }
+
+            StreamingBatch::~StreamingBatch()
+            {
+                // No-op.
+            }
+
+            void StreamingBatch::AddRow(const std::string& sql, const 
app::ParameterSet& params)
+            {
+                impl::interop::InteropOutputStream out(&data);
+
+                out.Position(data.Length());
+
+                impl::binary::BinaryWriterImpl writer(&out, 0);
+
+                if (currentSql != sql)
+                {
+                    currentSql = sql;
+
+                    writer.WriteString(sql);
+                }
+                else
+                    writer.WriteNull();
+
+                params.Write(writer);
+                ++size;
+
+                out.Synchronize();
+            }
+
+            void StreamingBatch::Clear()
+            {
+                currentSql.clear();
+
+                size = 0;
+
+                data.Length(0);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp 
b/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp
new file mode 100644
index 0000000..6953a3b
--- /dev/null
+++ b/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/odbc/connection.h"
+#include "ignite/odbc/message.h"
+#include "ignite/odbc/log.h"
+#include "ignite/odbc/sql/sql_set_streaming_command.h"
+
+#include "ignite/odbc/streaming/streaming_context.h"
+
+namespace ignite
+{
+    namespace odbc
+    {
+        namespace streaming
+        {
+            StreamingContext::StreamingContext() :
+                connection(0),
+                batchSize(0),
+                order(0),
+                enabled(false),
+                currentBatch()
+            {
+                // No-op.
+            }
+
+            StreamingContext::~StreamingContext()
+            {
+                // No-op.
+            }
+
+            SqlResult::Type StreamingContext::Enable(const 
SqlSetStreamingCommand& cmd)
+            {
+                SqlResult::Type res = SqlResult::AI_SUCCESS;
+
+                if (enabled)
+                    res = Disable();
+
+                if (res != SqlResult::AI_SUCCESS)
+                    return res;
+
+                batchSize = cmd.GetBatchSize();
+
+                enabled = true;
+
+                return SqlResult::AI_SUCCESS;
+            }
+
+            SqlResult::Type StreamingContext::Disable()
+            {
+                LOG_MSG("Disabling streaming context.");
+
+                SqlResult::Type res = SqlResult::AI_SUCCESS;
+
+                if (enabled)
+                    res = Flush(true);
+
+                enabled = false;
+
+                return res;
+            }
+
+            SqlResult::Type StreamingContext::Execute(const std::string& sql, 
const app::ParameterSet& params)
+            {
+                assert(enabled);
+
+                currentBatch.AddRow(sql, params);
+
+                if (currentBatch.GetSize() < batchSize)
+                    return SqlResult::AI_SUCCESS;
+
+                return Flush(false);
+            }
+
+            SqlResult::Type StreamingContext::Flush(bool last)
+            {
+                LOG_MSG("Flushing data");
+
+                if (currentBatch.GetSize() == 0 && !last)
+                    return SqlResult::AI_SUCCESS;
+
+                SqlResult::Type res = MakeRequestStreamingBatch(last);
+
+                currentBatch.Clear();
+
+                return res;
+            }
+
+            SqlResult::Type StreamingContext::MakeRequestStreamingBatch(bool 
last)
+            {
+                assert(connection != 0);
+
+                const std::string& schema = connection->GetSchema();
+
+                StreamingBatchRequest req(schema, currentBatch, last, order);
+                StreamingBatchResponse rsp;
+
+                try
+                {
+                    connection->SyncMessage(req, rsp);
+                }
+                catch (const OdbcError& err)
+                {
+                    connection->AddStatusRecord(err);
+
+                    return SqlResult::AI_ERROR;
+                }
+                catch (const IgniteError& err)
+                {
+                    
connection->AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, err.GetText());
+
+                    return SqlResult::AI_ERROR;
+                }
+
+                currentBatch.Clear();
+
+                if (rsp.GetStatus() != ResponseStatus::SUCCESS)
+                {
+                    LOG_MSG("Error: " << rsp.GetError());
+
+                    
connection->AddStatusRecord(ResponseStatusToSqlState(rsp.GetStatus()), 
rsp.GetError());
+
+                    return SqlResult::AI_ERROR;
+                }
+
+                if (rsp.GetErrorCode() != ResponseStatus::SUCCESS)
+                {
+                    LOG_MSG("Error: " << rsp.GetErrorMessage());
+
+                    
connection->AddStatusRecord(ResponseStatusToSqlState(rsp.GetErrorCode()), 
rsp.GetErrorMessage());
+
+                    return SqlResult::AI_ERROR;
+                }
+
+                assert(order == rsp.GetOrder());
+
+                ++order;
+
+                return SqlResult::AI_SUCCESS;
+            }
+        }
+    }
+}
+

Reply via email to