Repository: ignite Updated Branches: refs/heads/ignite-2.7 ba7833c05 -> 9f8d331dc
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h new file mode 100644 index 0000000..701bb3c --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_utils.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_UTILS +#define _IGNITE_ODBC_SQL_SQL_UTILS + +#include <string> + +#include <ignite/odbc/odbc_error.h> +#include <ignite/odbc/sql/sql_token.h> + +namespace ignite +{ + namespace odbc + { + namespace sql_utils + { + /** + * Parse token to boolean value. + * + * @return Boolean value. + */ + inline OdbcExpected<bool> TokenToBoolean(const SqlToken& token) + { + std::string lower = token.ToLower(); + + if (lower == "1" || lower == "on") + return true; + + if (lower == "0" || lower == "off") + return false; + + return OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Unexpected token: '" + token.ToString() + "', ON, OFF, 1 or 0 expected."); + } + + /** + * Check if the SQL is internal command. + * + * @param sql SQL request string. + * @return @c true if internal. + */ + bool IsInternalCommand(const std::string& sql); + } + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_UTILS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h index 31d07de..56eea6c 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h @@ -29,6 +29,7 @@ #include "ignite/odbc/app/parameter_set.h" #include "ignite/odbc/diagnostic/diagnosable_adapter.h" #include "ignite/odbc/common_types.h" +#include "sql/sql_set_streaming_command.h" namespace ignite { @@ -454,6 +455,28 @@ namespace ignite SqlResult::Type InternalClose(); /** + * Stop streaming. + * + * @return Operation result. + */ + SqlResult::Type StopStreaming(); + + /** + * Process internal SQL command. + * + * @param query SQL query. + * @return Operation result. + */ + SqlResult::Type ProcessInternalCommand(const std::string& query); + + /** + * Check if the streaming is active currently. + * + * @return @c true, if the streaming is active. + */ + bool IsStreamingActive() const; + + /** * Prepare SQL query. * * @param query SQL query. @@ -477,6 +500,13 @@ namespace ignite SqlResult::Type InternalExecuteSqlQuery(); /** + * Process internal query. + * + * @return Operation result. + */ + SqlResult::Type ProcessInternalQuery(); + + /** * Fetch query result row with offset * @param orientation Fetch type * @param offset Fetch offset http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h new file mode 100644 index 0000000..752dd63 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_batch.h @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_STREAMING_STREAMING_BATCH +#define _IGNITE_ODBC_STREAMING_STREAMING_BATCH + +#include <string> + +#include "ignite/impl/interop/interop_memory.h" + + +namespace ignite +{ + namespace odbc + { + namespace app + { + // Forward declaration. + class ParameterSet; + } + + namespace streaming + { + /** + * Streaming batch. + * + * Accumulates data for streaming. + */ + class StreamingBatch + { + public: + /** + * Default constructor. + */ + StreamingBatch(); + + /** + * Destructor. + */ + ~StreamingBatch(); + + /** + * Add another row to a batch. + * + * @param sql Sql. + * @param params Parameters. + */ + void AddRow(const std::string& sql, const app::ParameterSet& params); + + /** + * Clear the batch data. + */ + void Clear(); + + /** + * Get data. + * + * @return Data. + */ + const int8_t* GetData() const + { + return data.Data(); + } + + /** + * Get data length. + * + * @return Data length. + */ + int32_t GetDataLength() const + { + return data.Length(); + } + + /** + * Get number of rows in batch. + * + * @return Number of rows in batch. + */ + int32_t GetSize() const + { + return size; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(StreamingBatch); + + /** Current SQL. */ + std::string currentSql; + + /** Batch size in rows. */ + int32_t size; + + /** Batch data. */ + impl::interop::InteropUnpooledMemory data; + }; + } + } +} + +#endif //_IGNITE_ODBC_STREAMING_STREAMING_BATCH http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h new file mode 100644 index 0000000..2852596 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/streaming/streaming_context.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_STREAMING_STREAMING_CONTEXT +#define _IGNITE_ODBC_STREAMING_STREAMING_CONTEXT + +#include "ignite/odbc/query/query.h" +#include "ignite/odbc/app/parameter_set.h" + +#include "ignite/odbc/streaming/streaming_batch.h" + +namespace ignite +{ + namespace odbc + { + /** Set streaming forward-declaration. */ + class SqlSetStreamingCommand; + + /** Connection forward-declaration. */ + class Connection; + + namespace streaming + { + /** + * Streaming Query. + */ + class StreamingContext + { + public: + /** + * Default constructor. + */ + StreamingContext(); + + /** + * Set connection for streaming. + * + * @param connection Connection for streaming. + */ + void SetConnection(Connection& connection) + { + this->connection = &connection; + } + + /** + * Destructor. + */ + ~StreamingContext(); + + /** + * Enable streaming. + * + * @param cmd Set streaming command. + * @return Result. + */ + SqlResult::Type Enable(const SqlSetStreamingCommand& cmd); + + /** + * Disable streaming. + * + * @return Result. + */ + SqlResult::Type Disable(); + + /** + * Check if the streaming is enabled. + * + * @return @c true if enabled. + */ + bool IsEnabled() const + { + return enabled; + } + + /** + * Execute query. + * + * @param sql SQL. + * @param params SQL params. + * @return True on success. + */ + SqlResult::Type Execute(const std::string& sql, const app::ParameterSet& params); + + private: + IGNITE_NO_COPY_ASSIGNMENT(StreamingContext); + + /** + * Flush collected streaming data to remote server. + * + * @param last Last page indicator. + * @return Operation result. + */ + SqlResult::Type Flush(bool last); + + /** + * Send batch request. + * + * @param last Last page flag. + * @return Result. + */ + SqlResult::Type MakeRequestStreamingBatch(bool last); + + /** Connection associated with the statement. */ + Connection* connection; + + /** Batch size. */ + int32_t batchSize; + + /** Order. */ + int64_t order; + + /** Streaming enabled. */ + bool enabled; + + /** Current batch. */ + StreamingBatch currentBatch; + }; + } + } +} + +#endif //_IGNITE_ODBC_STREAMING_STREAMING_CONTEXT http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp index e3bbb4e..812d56f 100644 --- a/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp +++ b/modules/platforms/cpp/odbc/os/win/src/system/ui/dsn_configuration_window.cpp @@ -160,17 +160,17 @@ namespace ignite const ProtocolVersion::VersionSet& supported = ProtocolVersion::GetSupported(); - ProtocolVersion version = ProtocolVersion::GetCurrent(); - ProtocolVersion::VersionSet::const_iterator it; - for (it = supported.begin(); it != supported.end(); ++it) + ProtocolVersion version = config.GetProtocolVersion(); + + if (!version.IsSupported()) + version = ProtocolVersion::GetCurrent(); + + for (ProtocolVersion::VersionSet::const_iterator it = supported.begin(); it != supported.end(); ++it) { protocolVersionComboBox->AddString(it->ToString()); - if (*it == config.GetProtocolVersion()) - { + if (*it == version) protocolVersionComboBox->SetSelection(id); - version = *it; - } ++id; } @@ -296,7 +296,10 @@ namespace ignite int checkBoxSize = (sizeX - 3 * INTERVAL) / 2; - const ProtocolVersion version = config.GetProtocolVersion(); + ProtocolVersion version = config.GetProtocolVersion(); + + if (!version.IsSupported()) + version = ProtocolVersion::GetCurrent(); int rowPos = posY + 2 * INTERVAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj index 630dc27..69318ef 100644 --- a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj +++ b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj @@ -191,14 +191,21 @@ <ClCompile Include="..\..\src\query\foreign_keys_query.cpp" /> <ClCompile Include="..\..\src\query\primary_keys_query.cpp" /> <ClCompile Include="..\..\src\query\special_columns_query.cpp" /> + <ClCompile Include="..\..\src\query\streaming_query.cpp" /> <ClCompile Include="..\..\src\query\table_metadata_query.cpp" /> <ClCompile Include="..\..\src\query\type_info_query.cpp" /> <ClCompile Include="..\..\src\result_page.cpp" /> <ClCompile Include="..\..\src\row.cpp" /> + <ClCompile Include="..\..\src\sql\sql_lexer.cpp" /> + <ClCompile Include="..\..\src\sql\sql_parser.cpp" /> + <ClCompile Include="..\..\src\sql\sql_set_streaming_command.cpp" /> + <ClCompile Include="..\..\src\sql\sql_utils.cpp" /> <ClCompile Include="..\..\src\ssl\secure_socket_client.cpp" /> <ClCompile Include="..\..\src\ssl\ssl_gateway.cpp" /> <ClCompile Include="..\..\src\ssl\ssl_mode.cpp" /> <ClCompile Include="..\..\src\statement.cpp" /> + <ClCompile Include="..\..\src\streaming\streaming_batch.cpp" /> + <ClCompile Include="..\..\src\streaming\streaming_context.cpp" /> <ClCompile Include="..\..\src\type_traits.cpp" /> <ClCompile Include="..\..\src\utility.cpp" /> <ClCompile Include="..\..\src\log.cpp" /> @@ -239,19 +246,29 @@ <ClInclude Include="..\..\include\ignite\odbc\query\data_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\column_metadata_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\foreign_keys_query.h" /> + <ClInclude Include="..\..\include\ignite\odbc\query\internal_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\primary_keys_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\special_columns_query.h" /> + <ClInclude Include="..\..\include\ignite\odbc\query\streaming_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\table_metadata_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\query\type_info_query.h" /> <ClInclude Include="..\..\include\ignite\odbc\result_page.h" /> <ClInclude Include="..\..\include\ignite\odbc\row.h" /> <ClInclude Include="..\..\include\ignite\odbc\socket_client.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_command.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_lexer.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_parser.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_set_streaming_command.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_token.h" /> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_utils.h" /> <ClInclude Include="..\..\include\ignite\odbc\ssl\secure_socket_client.h" /> <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_bindings.h" /> <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_gateway.h" /> <ClInclude Include="..\..\include\ignite\odbc\ssl\ssl_mode.h" /> <ClInclude Include="..\..\include\ignite\odbc\statement.h" /> + <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_batch.h" /> + <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_context.h" /> <ClInclude Include="..\..\include\ignite\odbc\system\odbc_constants.h" /> <ClInclude Include="..\..\include\ignite\odbc\system\tcp_socket_client.h" /> <ClInclude Include="..\..\include\ignite\odbc\system\ui\dsn_configuration_window.h" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters index 6da0111..f0e49b5 100644 --- a/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters +++ b/modules/platforms/cpp/odbc/project/vs/odbc.vcxproj.filters @@ -32,6 +32,12 @@ <Filter Include="Code\ssl"> <UniqueIdentifier>{857734a3-6b29-412a-b75e-7fcc9d3fef8c}</UniqueIdentifier> </Filter> + <Filter Include="Code\sql"> + <UniqueIdentifier>{7f412a6b-279a-4a22-b87f-eafb1545a8d0}</UniqueIdentifier> + </Filter> + <Filter Include="Code\streaming"> + <UniqueIdentifier>{a14eb935-4a95-4202-a83b-12e715cf1cf1}</UniqueIdentifier> + </Filter> </ItemGroup> <ItemGroup> <ClCompile Include="..\..\src\odbc.cpp"> @@ -169,6 +175,27 @@ <ClCompile Include="..\..\src\nested_tx_mode.cpp"> <Filter>Code</Filter> </ClCompile> + <ClCompile Include="..\..\src\sql\sql_lexer.cpp"> + <Filter>Code\sql</Filter> + </ClCompile> + <ClCompile Include="..\..\src\sql\sql_parser.cpp"> + <Filter>Code\sql</Filter> + </ClCompile> + <ClCompile Include="..\..\src\sql\sql_set_streaming_command.cpp"> + <Filter>Code\sql</Filter> + </ClCompile> + <ClCompile Include="..\..\src\sql\sql_utils.cpp"> + <Filter>Code\sql</Filter> + </ClCompile> + <ClCompile Include="..\..\src\streaming\streaming_batch.cpp"> + <Filter>Code\streaming</Filter> + </ClCompile> + <ClCompile Include="..\..\src\query\streaming_query.cpp"> + <Filter>Code\query</Filter> + </ClCompile> + <ClCompile Include="..\..\src\streaming\streaming_context.cpp"> + <Filter>Code\streaming</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <None Include="module.def"> @@ -335,5 +362,35 @@ <ClInclude Include="..\..\include\ignite\odbc\nested_tx_mode.h"> <Filter>Code</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_lexer.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_parser.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_token.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_set_streaming_command.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_command.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\sql\sql_utils.h"> + <Filter>Code\sql</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\query\internal_query.h"> + <Filter>Code\query</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_batch.h"> + <Filter>Code\streaming</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\query\streaming_query.h"> + <Filter>Code\query</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\odbc\streaming\streaming_context.h"> + <Filter>Code\streaming</Filter> + </ClInclude> </ItemGroup> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp b/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp index 7f7c2f4..17165e2 100644 --- a/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp +++ b/modules/platforms/cpp/odbc/src/config/connection_string_parser.cpp @@ -65,7 +65,7 @@ namespace ignite // No-op. } - void ConnectionStringParser::ParseConnectionString(const char* str, size_t len, char delimeter, + void ConnectionStringParser::ParseConnectionString(const char* str, size_t len, char delimiter, diagnostic::DiagnosticRecordStorage* diag) { std::string connect_str(str, len); @@ -75,7 +75,7 @@ namespace ignite while (!connect_str.empty()) { - size_t attr_begin = connect_str.rfind(delimeter); + size_t attr_begin = connect_str.rfind(delimiter); if (attr_begin == std::string::npos) attr_begin = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index 5d01a0d..b580f12 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -63,9 +63,10 @@ namespace ignite autoCommit(true), parser(), config(), - info(config) + info(config), + streamingContext() { - // No-op. + streamingContext.SetConnection(*this); } Connection::~Connection() @@ -375,7 +376,7 @@ namespace ignite return config; } - bool Connection::IsAutoCommit() + bool Connection::IsAutoCommit() const { return autoCommit; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/message.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/message.cpp b/modules/platforms/cpp/odbc/src/message.cpp index 5730842..587d253 100644 --- a/modules/platforms/cpp/odbc/src/message.cpp +++ b/modules/platforms/cpp/odbc/src/message.cpp @@ -18,6 +18,8 @@ #include "ignite/odbc/message.h" #include "ignite/odbc/utility.h" +#include "ignite/odbc/streaming/streaming_batch.h" + namespace { using namespace ignite; @@ -123,7 +125,7 @@ namespace ignite writer.WriteBool(autoCommit); } - QueryExecuteBatchtRequest::QueryExecuteBatchtRequest(const std::string& schema, const std::string& sql, + QueryExecuteBatchRequest::QueryExecuteBatchRequest(const std::string& schema, const std::string& sql, const app::ParameterSet& params, SqlUlen begin, SqlUlen end, bool last, int32_t timeout, bool autoCommit) : schema(schema), sql(sql), @@ -137,12 +139,12 @@ namespace ignite // No-op. } - QueryExecuteBatchtRequest::~QueryExecuteBatchtRequest() + QueryExecuteBatchRequest::~QueryExecuteBatchRequest() { // No-op. } - void QueryExecuteBatchtRequest::Write(impl::binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + void QueryExecuteBatchRequest::Write(impl::binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const { writer.WriteInt8(RequestType::EXECUTE_SQL_QUERY_BATCH); @@ -262,6 +264,38 @@ namespace ignite writer.WriteInt32(pageSize); } + StreamingBatchRequest::StreamingBatchRequest(const std::string& schema, + const streaming::StreamingBatch& batch, bool last, int64_t order) : + schema(schema), + batch(batch), + last(last), + order(order) + { + // No-op. + } + + StreamingBatchRequest::~StreamingBatchRequest() + { + // No-op. + } + + void StreamingBatchRequest::Write(impl::binary::BinaryWriterImpl& writer, const ProtocolVersion&) const + { + writer.WriteInt8(RequestType::STREAMING_BATCH); + + writer.WriteString(schema); + + impl::interop::InteropOutputStream* stream = writer.GetStream(); + + writer.WriteInt32(batch.GetSize()); + + if (batch.GetSize() != 0) + stream->WriteInt8Array(batch.GetData(), batch.GetDataLength()); + + writer.WriteBool(last); + writer.WriteInt64(order); + } + Response::Response() : status(ResponseStatus::UNKNOWN_ERROR), error() @@ -358,7 +392,7 @@ namespace ignite ReadAffectedRows(reader, ver, affectedRows); } - QueryExecuteBatchResponse::QueryExecuteBatchResponse(): + QueryExecuteBatchResponse::QueryExecuteBatchResponse() : affectedRows(0), errorMessage(), errorCode(1) @@ -388,6 +422,26 @@ namespace ignite } } + StreamingBatchResponse::StreamingBatchResponse() : + errorMessage(), + errorCode(ResponseStatus::SUCCESS), + order(0) + { + // No-op. + } + + StreamingBatchResponse::~StreamingBatchResponse() + { + // No-op. + } + + void StreamingBatchResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + errorMessage = reader.ReadObject<std::string>(); + errorCode = reader.ReadInt32(); + order = reader.ReadInt64(); + } + QueryFetchResponse::QueryFetchResponse(ResultPage& resultPage) : queryId(0), resultPage(resultPage) http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/query/batch_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp b/modules/platforms/cpp/odbc/src/query/batch_query.cpp index 1256c94..741875a 100644 --- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp @@ -144,9 +144,8 @@ namespace ignite { const std::string& schema = connection.GetSchema(); - QueryExecuteBatchtRequest req(schema, sql, params, begin, end, last, timeout, + QueryExecuteBatchRequest req(schema, sql, params, begin, end, last, timeout, connection.IsAutoCommit()); - QueryExecuteBatchResponse rsp; try http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/query/streaming_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/streaming_query.cpp b/modules/platforms/cpp/odbc/src/query/streaming_query.cpp new file mode 100644 index 0000000..dd9302f --- /dev/null +++ b/modules/platforms/cpp/odbc/src/query/streaming_query.cpp @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/odbc/connection.h" +#include "ignite/odbc/message.h" +#include "ignite/odbc/log.h" +#include "ignite/odbc/query/streaming_query.h" +#include "ignite/odbc/sql/sql_set_streaming_command.h" + + +namespace ignite +{ + namespace odbc + { + namespace query + { + StreamingQuery::StreamingQuery( + diagnostic::Diagnosable& diag, + Connection& connection, + const app::ParameterSet& params) : + Query(diag, QueryType::STREAMING), + connection(connection), + params(params) + { + // No-op. + } + + StreamingQuery::~StreamingQuery() + { + // No-op. + } + + SqlResult::Type StreamingQuery::Execute() + { + return connection.GetStreamingContext().Execute(sql, params); + } + + const meta::ColumnMetaVector& StreamingQuery::GetMeta() const + { + static meta::ColumnMetaVector empty; + + return empty; + } + + SqlResult::Type StreamingQuery::FetchNextRow(app::ColumnBindingMap&) + { + return SqlResult::AI_NO_DATA; + } + + SqlResult::Type StreamingQuery::GetColumn(uint16_t, app::ApplicationDataBuffer&) + { + diag.AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, "Column is not available."); + + return SqlResult::AI_ERROR; + } + + SqlResult::Type StreamingQuery::Close() + { + return SqlResult::AI_SUCCESS; + } + + bool StreamingQuery::DataAvailable() const + { + return false; + } + + int64_t StreamingQuery::AffectedRows() const + { + return 0; + } + + SqlResult::Type StreamingQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp b/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp new file mode 100644 index 0000000..5422bc6 --- /dev/null +++ b/modules/platforms/cpp/odbc/src/sql/sql_lexer.cpp @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cctype> + +#include "ignite/odbc/odbc_error.h" +#include "ignite/odbc/sql/sql_lexer.h" + + +namespace ignite +{ + namespace odbc + { + SqlLexer::SqlLexer(const std::string& sql) : + sql(sql), + pos(0), + currentToken(0, 0, TokenType::EOD) + { + // No-op. + } + + SqlLexer::~SqlLexer() + { + // No-op. + } + + OdbcExpected<bool> SqlLexer::Shift() + { + if (IsEod()) + { + SetEod(); + + return false; + } + + TokenType::Type tokenType = TokenType::EOD; + + while (!IsEod()) + { + int32_t tokenBegin = pos; + + switch (sql[pos]) + { + case '-': + { + // Full-line comment. + if (HaveData(1) && sql[pos + 1] == '-') + { + pos += 2; + + while (!IsEod() && sql[pos] != '\n' && sql[pos] != '\r') + ++pos; + + continue; + } + + // Minus. + tokenType = TokenType::MINUS; + + break; + } + + case '"': + { + // Quoted string. + while (true) + { + ++pos; + + if (IsEod()) + return OdbcError(SqlState::SHY000_GENERAL_ERROR, "Unclosed quoted identifier."); + + if (sql[pos] == '"') + { + if (!HaveData(2) || sql[pos + 1] != '"') + break; + + ++pos; + } + } + + tokenType = TokenType::QUOTED; + + break; + } + + case '\'': + { + // String literal. + while (true) + { + ++pos; + + if (IsEod()) + return OdbcError(SqlState::SHY000_GENERAL_ERROR, "Unclosed string literal."); + + if (sql[pos] == '\'') + { + if (!HaveData(2) || sql[pos + 1] != '\'') + break; + + ++pos; + } + } + + tokenType = TokenType::STRING; + + break; + } + + case '.': + { + tokenType = TokenType::DOT; + + break; + } + + case ',': + { + tokenType = TokenType::COMMA; + + break; + } + + case ';': + { + tokenType = TokenType::SEMICOLON; + + break; + } + + case '(': + { + tokenType = TokenType::PARENTHESIS_LEFT; + + break; + } + + case ')': + { + tokenType = TokenType::PARENTHESIS_RIGHT; + + break; + } + + default: + { + // Skipping spaces. + if (iscntrl(sql[pos]) || isspace(sql[pos])) + { + do + { + ++pos; + } + while (!IsEod() && (iscntrl(sql[pos]) || isspace(sql[pos]))); + + continue; + } + + // Word. + while (!IsEod() && !IsDelimiter(sql[pos])) + ++pos; + + --pos; + + tokenType = TokenType::WORD; + + break; + } + } + + ++pos; + + if (tokenType != TokenType::EOD) + { + currentToken = SqlToken(&sql[tokenBegin], pos - tokenBegin, tokenType); + + return true; + } + } + + SetEod(); + + return false; + } + + bool SqlLexer::ExpectNextToken(TokenType::Type typ, const char* expected) + { + OdbcExpected<bool> hasNext = Shift(); + + if (!hasNext.IsOk() || !*hasNext) + return false; + + const SqlToken& token = GetCurrentToken(); + + return token.GetType() == typ && token.ToLower() == expected; + } + + bool SqlLexer::IsEod() const + { + return pos >= static_cast<int32_t>(sql.size()); + } + + void SqlLexer::SetEod() + { + pos = static_cast<int32_t>(sql.size()); + + currentToken = SqlToken(0, 0, TokenType::EOD); + } + + bool SqlLexer::HaveData(int32_t num) const + { + return static_cast<size_t>(pos + num) < sql.size(); + } + + bool SqlLexer::IsDelimiter(int c) + { + return !isalnum(c) && c != '_'; + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp b/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp new file mode 100644 index 0000000..d6b5d7d --- /dev/null +++ b/modules/platforms/cpp/odbc/src/sql/sql_parser.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/common/utils.h> + +#include <ignite/odbc/odbc_error.h> +#include <ignite/odbc/sql/sql_set_streaming_command.h> +#include <ignite/odbc/sql/sql_parser.h> + +const static std::string WORD_SET("set"); + +const static std::string WORD_STREAMING("streaming"); + +namespace ignite +{ + namespace odbc + { + SqlParser::SqlParser(const std::string& sql) : + lexer(sql) + { + // No-op. + } + + SqlParser::~SqlParser() + { + // No-op. + } + + std::auto_ptr<SqlCommand> SqlParser::GetNextCommand() + { + while (true) + { + if (!*lexer.Shift()) + return std::auto_ptr<SqlCommand>(); + + const SqlToken& token = lexer.GetCurrentToken(); + + switch (token.GetType()) + { + case TokenType::SEMICOLON: + // Empty command. Skipping. + continue; + + case TokenType::WORD: + return ProcessCommand(); + + case TokenType::QUOTED: + case TokenType::MINUS: + case TokenType::DOT: + case TokenType::COMMA: + case TokenType::PARENTHESIS_LEFT: + case TokenType::PARENTHESIS_RIGHT: + default: + { + throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Unexpected token: '" + token.ToString() + "'"); + } + } + } + } + + std::auto_ptr<SqlCommand> SqlParser::ProcessCommand() + { + const SqlToken& token = lexer.GetCurrentToken(); + + if (WORD_SET == token.ToLower() && + *lexer.Shift() && + token.GetType() == TokenType::WORD && + WORD_STREAMING == token.ToLower()) + { + std::auto_ptr<SqlCommand> cmd(new SqlSetStreamingCommand); + + cmd->Parse(lexer); + + return cmd; + } + + throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Unexpected token: '" + token.ToString() + "'"); + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp b/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp new file mode 100644 index 0000000..5047917 --- /dev/null +++ b/modules/platforms/cpp/odbc/src/sql/sql_set_streaming_command.cpp @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/common/utils.h> + +#include <ignite/odbc/odbc_error.h> +#include <ignite/odbc/sql/sql_lexer.h> +#include <ignite/odbc/sql/sql_token.h> +#include <ignite/odbc/sql/sql_utils.h> +#include <ignite/odbc/sql/sql_set_streaming_command.h> + +const static std::string WORD_BATCH_SIZE("batch_size"); + +const static std::string WORD_PER_NODE_BUFFER_SIZE("per_node_buffer_size"); + +const static std::string WORD_PER_NODE_PARALLEL_OPERATIONS("per_node_parallel_operations"); + +const static std::string WORD_ALLOW_OVERWRITE("allow_overwrite"); + +const static std::string WORD_FLUSH_FREQUENCY("flush_frequency"); + +const static std::string WORD_ORDERED("ordered"); + +namespace ignite +{ + namespace odbc + { + SqlSetStreamingCommand::SqlSetStreamingCommand() : + SqlCommand(SqlCommandType::SET_STREAMING), + enabled(false), + allowOverwrite(false), + batchSize(DEFAULT_STREAM_BATCH_SIZE), + parallelOpsPerNode(0), + bufferSizePerNode(0), + flushFrequency(0), + ordered(false) + { + // No-op. + } + + SqlSetStreamingCommand::~SqlSetStreamingCommand() + { + // No-op. + } + + void SqlSetStreamingCommand::Parse(SqlLexer& lexer) + { + enabled = ExpectBool(lexer); + + const SqlToken& token = lexer.GetCurrentToken(); + + while (*lexer.Shift() && token.GetType() != TokenType::SEMICOLON) + { + if (token.ToLower() == WORD_BATCH_SIZE) + { + CheckEnabled(token); + + batchSize = ExpectPositiveInteger(lexer, "batch size"); + + continue; + } + + if (token.ToLower() == WORD_PER_NODE_BUFFER_SIZE) + { + CheckEnabled(token); + + bufferSizePerNode = ExpectPositiveInteger(lexer, "per node buffer size"); + + continue; + } + + if (token.ToLower() == WORD_PER_NODE_PARALLEL_OPERATIONS) + { + CheckEnabled(token); + + parallelOpsPerNode = ExpectPositiveInteger(lexer, "per node parallel operations number"); + + continue; + } + + if (token.ToLower() == WORD_ALLOW_OVERWRITE) + { + CheckEnabled(token); + + allowOverwrite = ExpectBool(lexer); + + continue; + } + + if (token.ToLower() == WORD_FLUSH_FREQUENCY) + { + CheckEnabled(token); + + flushFrequency = ExpectPositiveInteger(lexer, "flush frequency"); + + continue; + } + + if (token.ToLower() == WORD_ORDERED) + { + CheckEnabled(token); + + ordered = true; + + continue; + } + + ThrowUnexpectedTokenError(token, "additional parameter of SET STREAMING command or semicolon"); + } + } + + void SqlSetStreamingCommand::CheckEnabled(const SqlToken& token) const + { + if (!enabled) + ThrowUnexpectedTokenError(token, "no parameters with STREAMING OFF command"); + } + + void SqlSetStreamingCommand::ThrowUnexpectedTokenError(const SqlToken& token, const std::string& expected) + { + throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Unexpected token: '" + token.ToString() + "', " + expected + " expected."); + } + + void SqlSetStreamingCommand::ThrowUnexpectedEndOfStatement(const std::string& expected) + { + throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Unexpected end of statement: " + expected + " expected."); + } + + int32_t SqlSetStreamingCommand::ExpectInt(SqlLexer& lexer) + { + if (!*lexer.Shift()) + ThrowUnexpectedEndOfStatement("integer number"); + + const SqlToken& token = lexer.GetCurrentToken(); + + int sign = 1; + + if (token.GetType() == TokenType::MINUS) + { + sign = -1; + + if (!*lexer.Shift()) + ThrowUnexpectedEndOfStatement("integer number"); + } + + std::string str = token.ToString(); + + if (token.GetType() == TokenType::WORD && + common::AllOf(str.begin(), str.end(), isdigit)) + { + int64_t val = sign * common::LexicalCast<int64_t>(str); + + if (val >= INT32_MIN && val <= INT32_MAX) + return static_cast<int32_t>(val); + } + + ThrowUnexpectedTokenError(token, "integer number"); + + return 0; + } + + int32_t SqlSetStreamingCommand::ExpectPositiveInteger(SqlLexer& lexer, const std::string& description) + { + int32_t val = ExpectInt(lexer); + + if (val <= 0) + { + throw OdbcError(SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION, + "Invalid " + description + " - positive integer number is expected."); + } + + return val; + } + + bool SqlSetStreamingCommand::ExpectBool(SqlLexer& lexer) + { + const SqlToken& token = lexer.GetCurrentToken(); + + if (!*lexer.Shift()) + ThrowUnexpectedTokenError(token, "ON or OFF"); + + return *sql_utils::TokenToBoolean(token); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp b/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp new file mode 100644 index 0000000..2e7066f --- /dev/null +++ b/modules/platforms/cpp/odbc/src/sql/sql_utils.cpp @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/odbc/odbc_error.h> + +#include <ignite/odbc/sql/sql_lexer.h> +#include <ignite/odbc/sql/sql_utils.h> + +namespace ignite +{ + namespace odbc + { + namespace sql_utils + { + bool IsInternalCommand(const std::string& sql) + { + SqlLexer lexer(sql); + + while (true) + { + OdbcExpected<bool> hasNext = lexer.Shift(); + + if (!hasNext.IsOk() || !*hasNext) + return false; + + const SqlToken& token = lexer.GetCurrentToken(); + + if (token.GetType() != TokenType::SEMICOLON) + { + if (token.GetType() == TokenType::WORD && token.ToLower() == "set") + break; + + return false; + } + } + + return lexer.ExpectNextToken(TokenType::WORD, "streaming"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/statement.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/statement.cpp b/modules/platforms/cpp/odbc/src/statement.cpp index fd2d4ec..8ba0902 100644 --- a/modules/platforms/cpp/odbc/src/statement.cpp +++ b/modules/platforms/cpp/odbc/src/statement.cpp @@ -26,12 +26,17 @@ #include "ignite/odbc/query/primary_keys_query.h" #include "ignite/odbc/query/type_info_query.h" #include "ignite/odbc/query/special_columns_query.h" +#include "ignite/odbc/query/streaming_query.h" +#include "ignite/odbc/query/internal_query.h" #include "ignite/odbc/connection.h" #include "ignite/odbc/utility.h" #include "ignite/odbc/message.h" #include "ignite/odbc/statement.h" #include "ignite/odbc/log.h" #include "ignite/odbc/odbc_error.h" +#include "ignite/odbc/sql/sql_utils.h" +#include "ignite/odbc/sql/sql_parser.h" +#include "ignite/odbc/sql/sql_set_streaming_command.h" namespace ignite { @@ -281,7 +286,17 @@ namespace ignite case SQL_ATTR_PARAMSET_SIZE: { - parameters.SetParamSetSize(reinterpret_cast<SqlUlen>(value)); + SqlUlen size = reinterpret_cast<SqlUlen>(value); + + if (size > 1 && IsStreamingActive()) + { + AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, + "Batching is not supported in streaming mode."); + + return SqlResult::AI_ERROR; + } + + parameters.SetParamSetSize(size); break; } @@ -558,14 +573,58 @@ namespace ignite IGNITE_ODBC_API_CALL(InternalPrepareSqlQuery(query)); } + SqlResult::Type Statement::ProcessInternalCommand(const std::string& query) + { + try + { + SqlParser parser(query); + + std::auto_ptr<SqlCommand> cmd = parser.GetNextCommand(); + + assert(cmd.get() != 0); + + parameters.Prepare(); + + currentQuery.reset(new query::InternalQuery(*this, query, cmd)); + + return SqlResult::AI_SUCCESS; + } + catch (const OdbcError& err) + { + AddStatusRecord(err); + + return SqlResult::AI_ERROR; + } + } + + bool Statement::IsStreamingActive() const + { + return connection.GetStreamingContext().IsEnabled(); + } + SqlResult::Type Statement::InternalPrepareSqlQuery(const std::string& query) { - if (currentQuery.get()) - currentQuery->Close(); + if (sql_utils::IsInternalCommand(query)) + return ProcessInternalCommand(query); // Resetting parameters types as we are changing the query. parameters.Prepare(); + if (IsStreamingActive()) + { + if (!currentQuery.get()) + currentQuery.reset(new query::StreamingQuery(*this, connection, parameters)); + + query::StreamingQuery* currentQuery0 = static_cast<query::StreamingQuery*>(currentQuery.get()); + + currentQuery0->PrepareQuery(query); + + return SqlResult::AI_SUCCESS; + } + + if (currentQuery.get()) + currentQuery->Close(); + currentQuery.reset(new query::DataQuery(*this, connection, query, parameters, timeout)); return SqlResult::AI_SUCCESS; @@ -600,6 +659,13 @@ namespace ignite return SqlResult::AI_ERROR; } + if (currentQuery->GetType() == query::QueryType::INTERNAL) + { + ProcessInternalQuery(); + + return SqlResult::AI_SUCCESS; + } + if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == query::QueryType::DATA) { query::DataQuery& qry = static_cast<query::DataQuery&>(*currentQuery); @@ -613,12 +679,21 @@ namespace ignite currentQuery.reset(new query::DataQuery(*this, connection, qry.GetSql(), parameters, timeout)); } + if (parameters.GetParamSetSize() > 1 && currentQuery->GetType() == query::QueryType::STREAMING) + { + AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, + "Batching is not supported in streaming mode."); + + return SqlResult::AI_ERROR; + } + if (parameters.IsDataAtExecNeeded()) { - if (currentQuery->GetType() == query::QueryType::BATCH) + if (currentQuery->GetType() == query::QueryType::BATCH || + currentQuery->GetType() == query::QueryType::STREAMING) { AddStatusRecord(SqlState::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED, - "Data-at-execution is not supported together with batching."); + "Data-at-execution is not supported with batching."); return SqlResult::AI_ERROR; } @@ -629,6 +704,43 @@ namespace ignite return currentQuery->Execute(); } + SqlResult::Type Statement::ProcessInternalQuery() + { + assert(currentQuery->GetType() == query::QueryType::INTERNAL); + + query::InternalQuery* qry = static_cast<query::InternalQuery*>(currentQuery.get()); + + LOG_MSG("Processing internal query: " << qry->GetQuery()); + + assert(qry->GetCommand().GetType() == SqlCommandType::SET_STREAMING); + + SqlSetStreamingCommand& cmd = static_cast<SqlSetStreamingCommand&>(qry->GetCommand()); + + StopStreaming(); + + if (!cmd.IsEnabled()) + return SqlResult::AI_SUCCESS; + + LOG_MSG("Sending start streaming command"); + + query::DataQuery enablingQuery(*this, connection, qry->GetQuery(), parameters, timeout); + + SqlResult::Type res = enablingQuery.Execute(); + + if (res != SqlResult::AI_SUCCESS) + return res; + + LOG_MSG("Preparing streaming context on client"); + + connection.GetStreamingContext().Enable(cmd); + + std::auto_ptr<query::Query> newQry(new query::StreamingQuery(*this, connection, parameters)); + + std::swap(currentQuery, newQry); + + return SqlResult::AI_SUCCESS; + } + void Statement::ExecuteGetColumnsMetaQuery(const std::string& schema, const std::string& table, const std::string& column) { @@ -825,6 +937,18 @@ namespace ignite return result; } + SqlResult::Type Statement::StopStreaming() + { + if (!IsStreamingActive()) + return SqlResult::AI_SUCCESS; + + LOG_MSG("Stopping streaming"); + + SqlResult::Type result = connection.GetStreamingContext().Disable(); + + return result; + } + void Statement::FetchScroll(int16_t orientation, int64_t offset) { IGNITE_ODBC_API_CALL(InternalFetchScroll(orientation, offset)); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp b/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp new file mode 100644 index 0000000..2cdb281 --- /dev/null +++ b/modules/platforms/cpp/odbc/src/streaming/streaming_batch.cpp @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/interop/interop_output_stream.h" +#include "ignite/impl/binary/binary_writer_impl.h" + +#include "ignite/odbc/app/parameter_set.h" +#include "ignite/odbc/streaming/streaming_batch.h" + +namespace ignite +{ + namespace odbc + { + namespace streaming + { + StreamingBatch::StreamingBatch() : + currentSql(), + size(0), + data(1024 * 16) + { + // No-op. + } + + StreamingBatch::~StreamingBatch() + { + // No-op. + } + + void StreamingBatch::AddRow(const std::string& sql, const app::ParameterSet& params) + { + impl::interop::InteropOutputStream out(&data); + + out.Position(data.Length()); + + impl::binary::BinaryWriterImpl writer(&out, 0); + + if (currentSql != sql) + { + currentSql = sql; + + writer.WriteString(sql); + } + else + writer.WriteNull(); + + params.Write(writer); + ++size; + + out.Synchronize(); + } + + void StreamingBatch::Clear() + { + currentSql.clear(); + + size = 0; + + data.Length(0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp b/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp new file mode 100644 index 0000000..6953a3b --- /dev/null +++ b/modules/platforms/cpp/odbc/src/streaming/streaming_context.cpp @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/odbc/connection.h" +#include "ignite/odbc/message.h" +#include "ignite/odbc/log.h" +#include "ignite/odbc/sql/sql_set_streaming_command.h" + +#include "ignite/odbc/streaming/streaming_context.h" + +namespace ignite +{ + namespace odbc + { + namespace streaming + { + StreamingContext::StreamingContext() : + connection(0), + batchSize(0), + order(0), + enabled(false), + currentBatch() + { + // No-op. + } + + StreamingContext::~StreamingContext() + { + // No-op. + } + + SqlResult::Type StreamingContext::Enable(const SqlSetStreamingCommand& cmd) + { + SqlResult::Type res = SqlResult::AI_SUCCESS; + + if (enabled) + res = Disable(); + + if (res != SqlResult::AI_SUCCESS) + return res; + + batchSize = cmd.GetBatchSize(); + + enabled = true; + + return SqlResult::AI_SUCCESS; + } + + SqlResult::Type StreamingContext::Disable() + { + LOG_MSG("Disabling streaming context."); + + SqlResult::Type res = SqlResult::AI_SUCCESS; + + if (enabled) + res = Flush(true); + + enabled = false; + + return res; + } + + SqlResult::Type StreamingContext::Execute(const std::string& sql, const app::ParameterSet& params) + { + assert(enabled); + + currentBatch.AddRow(sql, params); + + if (currentBatch.GetSize() < batchSize) + return SqlResult::AI_SUCCESS; + + return Flush(false); + } + + SqlResult::Type StreamingContext::Flush(bool last) + { + LOG_MSG("Flushing data"); + + if (currentBatch.GetSize() == 0 && !last) + return SqlResult::AI_SUCCESS; + + SqlResult::Type res = MakeRequestStreamingBatch(last); + + currentBatch.Clear(); + + return res; + } + + SqlResult::Type StreamingContext::MakeRequestStreamingBatch(bool last) + { + assert(connection != 0); + + const std::string& schema = connection->GetSchema(); + + StreamingBatchRequest req(schema, currentBatch, last, order); + StreamingBatchResponse rsp; + + try + { + connection->SyncMessage(req, rsp); + } + catch (const OdbcError& err) + { + connection->AddStatusRecord(err); + + return SqlResult::AI_ERROR; + } + catch (const IgniteError& err) + { + connection->AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, err.GetText()); + + return SqlResult::AI_ERROR; + } + + currentBatch.Clear(); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + { + LOG_MSG("Error: " << rsp.GetError()); + + connection->AddStatusRecord(ResponseStatusToSqlState(rsp.GetStatus()), rsp.GetError()); + + return SqlResult::AI_ERROR; + } + + if (rsp.GetErrorCode() != ResponseStatus::SUCCESS) + { + LOG_MSG("Error: " << rsp.GetErrorMessage()); + + connection->AddStatusRecord(ResponseStatusToSqlState(rsp.GetErrorCode()), rsp.GetErrorMessage()); + + return SqlResult::AI_ERROR; + } + + assert(order == rsp.GetOrder()); + + ++order; + + return SqlResult::AI_SUCCESS; + } + } + } +} +