http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/queries_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp index 471f7d7..cfc3fad 100644 --- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp @@ -69,7 +69,7 @@ struct QueriesTestSuiteFixture : odbc::OdbcTestSuite cache1(0), cache2(0) { - grid = StartTestNode("queries-test.xml", "NodeMain"); + grid = StartPlatformNode("queries-test.xml", "NodeMain"); cache1 = grid.GetCache<int64_t, TestType>("cache"); cache2 = grid.GetCache<int64_t, ComplexType>("cache2"); @@ -224,7 +224,7 @@ struct QueriesTestSuiteFixture : odbc::OdbcTestSuite static Ignite StartAdditionalNode(const char* name) { - return StartTestNode("queries-test.xml", name); + return StartPlatformNode("queries-test.xml", name); } /** Node started during the test. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp b/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp index 7842704..d8ed087 100644 --- a/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/sql_get_info_test.cpp @@ -87,7 +87,7 @@ struct SqlGetInfoTestSuiteFixture : odbc::OdbcTestSuite */ SqlGetInfoTestSuiteFixture() { - grid = StartTestNode("queries-test.xml", "NodeMain"); + grid = StartPlatformNode("queries-test.xml", "NodeMain"); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/sql_parsing_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/sql_parsing_test.cpp b/modules/platforms/cpp/odbc-test/src/sql_parsing_test.cpp new file mode 100644 index 0000000..ac2e24d --- /dev/null +++ b/modules/platforms/cpp/odbc-test/src/sql_parsing_test.cpp @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MSC_VER +# define BOOST_TEST_DYN_LINK +#endif + +#include <boost/test/unit_test.hpp> + +#include <string> + +#include <ignite/ignite.h> +#include <ignite/ignition.h> + +#include <ignite/odbc/odbc_error.h> +#include <ignite/odbc/sql/sql_lexer.h> +#include <ignite/odbc/sql/sql_parser.h> +#include <ignite/odbc/sql/sql_utils.h> +#include <ignite/odbc/sql/sql_set_streaming_command.h> + +#include "test_utils.h" + + +using namespace ignite; +using namespace ignite_test; + +using namespace boost::unit_test; + +void CheckNextToken(odbc::SqlLexer& lexer, odbc::TokenType::Type tokenType, const std::string& expected) +{ + BOOST_REQUIRE(!lexer.IsEod()); + + odbc::OdbcExpected<bool> hasNext = lexer.Shift(); + + if (!hasNext.IsOk()) + BOOST_FAIL(hasNext.GetError().GetErrorMessage()); + + BOOST_CHECK(*hasNext); + + const odbc::SqlToken& token = lexer.GetCurrentToken(); + + BOOST_REQUIRE(token.GetValue()); + BOOST_CHECK_GT(token.GetSize(), 0); + BOOST_CHECK_EQUAL(token.GetType(), tokenType); + BOOST_CHECK_EQUAL(token.ToString(), expected); +} + +void CheckSetStreamingCommand( + odbc::SqlParser& parser, + bool enabled, + bool allowOverwrite, + int32_t batchSize, + int32_t bufferSizePerNode, + int32_t parallelOperationsPerNode, + int64_t flushFrequency, + bool ordered) +{ + std::auto_ptr<odbc::SqlCommand> cmd = parser.GetNextCommand(); + + BOOST_REQUIRE(cmd.get() != 0); + BOOST_REQUIRE_EQUAL(cmd->GetType(), odbc::SqlCommandType::SET_STREAMING); + + odbc::SqlSetStreamingCommand& cmd0 = static_cast<odbc::SqlSetStreamingCommand&>(*cmd); + + BOOST_CHECK_EQUAL(cmd0.IsEnabled(), enabled); + BOOST_CHECK_EQUAL(cmd0.IsAllowOverwrite(), allowOverwrite); + BOOST_CHECK_EQUAL(cmd0.GetBatchSize(), batchSize); + BOOST_CHECK_EQUAL(cmd0.GetBufferSizePerNode(), bufferSizePerNode); + BOOST_CHECK_EQUAL(cmd0.GetParallelOperationsPerNode(), parallelOperationsPerNode); + BOOST_CHECK_EQUAL(cmd0.GetFlushFrequency(), flushFrequency); + BOOST_CHECK_EQUAL(cmd0.IsOrdered(), ordered); +} + +void CheckSingleSetStreamingCommand( + const std::string& sql, + bool enabled, + bool allowOverwrite, + int32_t batchSize, + int32_t bufferSizePerNode, + int32_t parallelOperationsPerNode, + int64_t flushFrequency, + bool ordered) +{ + BOOST_CHECK(odbc::sql_utils::IsInternalCommand(sql)); + + odbc::SqlParser parser(sql); + + CheckSetStreamingCommand(parser, enabled, allowOverwrite, batchSize, bufferSizePerNode, + parallelOperationsPerNode, flushFrequency, ordered); + + std::auto_ptr<odbc::SqlCommand> cmd = parser.GetNextCommand(); + BOOST_CHECK(cmd.get() == 0); +} + +void CheckUnexpectedTokenError(const std::string& sql, const std::string& token, const std::string& expected = "additional parameter of SET STREAMING command or semicolon") +{ + odbc::SqlParser parser(sql); + + try + { + parser.GetNextCommand(); + + BOOST_FAIL("Exception expected."); + } + catch (const odbc::OdbcError& err) + { + std::string expErr = "Unexpected token: '" + token + "'"; + + if (!expected.empty()) + expErr += ", " + expected + " expected."; + + BOOST_CHECK_EQUAL(err.GetStatus(), odbc::SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION); + BOOST_CHECK_EQUAL(err.GetErrorMessage(), expErr); + } +} + +void CheckUnexpectedEndOfStatement(const std::string& sql, const std::string& expected) +{ + odbc::SqlParser parser(sql); + + try + { + parser.GetNextCommand(); + + BOOST_FAIL("Exception expected."); + } + catch (const odbc::OdbcError& err) + { + std::string expErr = "Unexpected end of statement: " + expected + " expected."; + + BOOST_CHECK_EQUAL(err.GetStatus(), odbc::SqlState::S42000_SYNTAX_ERROR_OR_ACCESS_VIOLATION); + BOOST_CHECK_EQUAL(err.GetErrorMessage(), expErr); + } +} + +BOOST_AUTO_TEST_SUITE(SqlParsingTestSuite) + +BOOST_AUTO_TEST_CASE(LexerTokens) +{ + std::string sql( + "word " + "otherword " + "and " + "another " + "\"quoted\" " + "\"quoted with \"\"qoutes\"\" inside \" " + "'string' " + "'string with \"quotes\"' " + " \n \r \t " + "'string with ''string quotes'' inside' " + "some.val " + "three,vals,here " + "\"Lorem;,.\";'Ipsum,;.--' " + "(something) " + "-- comment " + "still comment \n" + "-5 end"); + + odbc::SqlLexer lexer(sql); + + CheckNextToken(lexer, odbc::TokenType::WORD, "word"); + CheckNextToken(lexer, odbc::TokenType::WORD, "otherword"); + CheckNextToken(lexer, odbc::TokenType::WORD, "and"); + CheckNextToken(lexer, odbc::TokenType::WORD, "another"); + CheckNextToken(lexer, odbc::TokenType::QUOTED, "\"quoted\""); + CheckNextToken(lexer, odbc::TokenType::QUOTED, "\"quoted with \"\"qoutes\"\" inside \""); + CheckNextToken(lexer, odbc::TokenType::STRING, "'string'"); + CheckNextToken(lexer, odbc::TokenType::STRING, "'string with \"quotes\"'"); + CheckNextToken(lexer, odbc::TokenType::STRING, "'string with ''string quotes'' inside'"); + CheckNextToken(lexer, odbc::TokenType::WORD, "some"); + CheckNextToken(lexer, odbc::TokenType::DOT, "."); + CheckNextToken(lexer, odbc::TokenType::WORD, "val"); + CheckNextToken(lexer, odbc::TokenType::WORD, "three"); + CheckNextToken(lexer, odbc::TokenType::COMMA, ","); + CheckNextToken(lexer, odbc::TokenType::WORD, "vals"); + CheckNextToken(lexer, odbc::TokenType::COMMA, ","); + CheckNextToken(lexer, odbc::TokenType::WORD, "here"); + CheckNextToken(lexer, odbc::TokenType::QUOTED, "\"Lorem;,.\""); + CheckNextToken(lexer, odbc::TokenType::SEMICOLON, ";"); + CheckNextToken(lexer, odbc::TokenType::STRING, "'Ipsum,;.--'"); + CheckNextToken(lexer, odbc::TokenType::PARENTHESIS_LEFT, "("); + CheckNextToken(lexer, odbc::TokenType::WORD, "something"); + CheckNextToken(lexer, odbc::TokenType::PARENTHESIS_RIGHT, ")"); + CheckNextToken(lexer, odbc::TokenType::MINUS, "-"); + CheckNextToken(lexer, odbc::TokenType::WORD, "5"); + CheckNextToken(lexer, odbc::TokenType::WORD, "end"); + + BOOST_REQUIRE(lexer.IsEod()); + + odbc::OdbcExpected<bool> hasNext = lexer.Shift(); + + if (!hasNext.IsOk()) + BOOST_FAIL(hasNext.GetError().GetErrorMessage()); + + BOOST_CHECK(!*hasNext); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOff) +{ + CheckSingleSetStreamingCommand("set streaming off", false, false, 2048, 0, 0, 0, false); + CheckSingleSetStreamingCommand("set streaming 0", false, false, 2048, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOffMixedCase) +{ + CheckSingleSetStreamingCommand("SET Streaming OfF", false, false, 2048, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOn) +{ + CheckSingleSetStreamingCommand("set streaming on", true, false, 2048, 0, 0, 0, false); + CheckSingleSetStreamingCommand("set streaming 1", true, false, 2048, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnAllowOverwriteOn) +{ + CheckSingleSetStreamingCommand("set streaming on allow_overwrite on", true, true, 2048, 0, 0, 0, false); + CheckSingleSetStreamingCommand("set streaming on allow_overwrite 1", true, true, 2048, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnAllowOverwriteOff) +{ + CheckSingleSetStreamingCommand("set streaming on allow_overwrite off", true, false, 2048, 0, 0, 0, false); + CheckSingleSetStreamingCommand("set streaming on allow_overwrite 0", true, false, 2048, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnPageSize512) +{ + CheckSingleSetStreamingCommand("set streaming on batch_size 512", true, false, 512, 0, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnPerNodeBufferSize500) +{ + CheckSingleSetStreamingCommand("set streaming on per_node_buffer_size 500", true, false, 2048, 500, 0, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnPerNodeParallelOperations4) +{ + CheckSingleSetStreamingCommand("set streaming on per_node_parallel_operations 4", true, false, 2048, 0, 4, 0, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnFlushFrequency100) +{ + CheckSingleSetStreamingCommand("set streaming on flush_frequency 100", true, false, 2048, 0, 0, 100, false); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnOrdered) +{ + CheckSingleSetStreamingCommand("set streaming on ordered", true, false, 2048, 0, 0, 0, true); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnAll) +{ + CheckSingleSetStreamingCommand( + "set streaming 1 " + "allow_overwrite on " + "batch_size 512 " + "per_node_buffer_size 500 " + "per_node_parallel_operations 4 " + "flush_frequency 100 " + "ordered", + true, true, 512, 500, 4, 100, true); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnAllDisorder) +{ + CheckSingleSetStreamingCommand( + "set streaming 1 " + "batch_size 512 " + "allow_overwrite on " + "ordered " + "per_node_buffer_size 500 " + "flush_frequency 100 " + "per_node_parallel_operations 4 ", + true, true, 512, 500, 4, 100, true); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnAllBackward) +{ + CheckSingleSetStreamingCommand( + "set streaming 1 " + "ordered " + "flush_frequency 100 " + "per_node_parallel_operations 4 " + "per_node_buffer_size 500 " + "batch_size 512 " + "allow_overwrite on ", + true, true, 512, 500, 4, 100, true); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnOff) +{ + std::string sql("set streaming 1; set streaming off"); + + odbc::SqlParser parser(sql); + + CheckSetStreamingCommand(parser, true, false, 2048, 0, 0, 0, false); + CheckSetStreamingCommand(parser, false, false, 2048, 0, 0, 0, false); + + std::auto_ptr<odbc::SqlCommand> cmd = parser.GetNextCommand(); + + BOOST_CHECK(cmd.get() == 0); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOnUnexpectedTokenError) +{ + CheckUnexpectedTokenError("set streaming 1 ololo", "ololo"); + CheckUnexpectedTokenError("set streaming 1 ordered lorem_ipsum", "lorem_ipsum"); + CheckUnexpectedTokenError("set streaming ON lorem_ipsum ordered", "lorem_ipsum"); + + CheckUnexpectedTokenError("set streaming some", "some", "ON, OFF, 1 or 0"); + CheckUnexpectedTokenError("some", "some", ""); +} + +BOOST_AUTO_TEST_CASE(ParserSetStreamingOffUnexpectedTokenError) +{ + CheckUnexpectedTokenError("set streaming 0 ololo", "ololo"); + CheckUnexpectedTokenError("set streaming 0 ordered", "ordered", "no parameters with STREAMING OFF command"); + CheckUnexpectedTokenError("set streaming OFF lorem_ipsum ordered", "lorem_ipsum"); +} + +BOOST_AUTO_TEST_CASE(ParserInternalCommand) +{ + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming 1")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming 0")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming on")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming off")); + + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("SET STREAMING 1")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("SET STREAMING 0")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("SET STREAMING ON")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("SET STREAMING OFF")); + + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("Set Streaming 1")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("Set Streaming 0")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("Set Streaming On")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("Set Streaming Off")); + + BOOST_CHECK(odbc::sql_utils::IsInternalCommand(";SET STREAMING 1")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand(";;SET STREAMING 0")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand(";;;SET STREAMING ON")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand(";;;;SET STREAMING OFF")); + + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming")); + BOOST_CHECK(odbc::sql_utils::IsInternalCommand("set streaming blah")); +} + +BOOST_AUTO_TEST_CASE(ParserNonInternalCommand) +{ + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand("")); + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand("Blah")); + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand("0")); + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand(";")); + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand("Lorem ipsum")); + BOOST_CHECK(!odbc::sql_utils::IsInternalCommand("set some")); +} + +BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp b/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp index 31d6717..15c63a4 100644 --- a/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp +++ b/modules/platforms/cpp/odbc-test/src/sql_test_suite_fixture.cpp @@ -29,11 +29,7 @@ namespace ignite dbc(NULL), stmt(NULL) { -#ifdef IGNITE_TESTS_32 - grid = StartNode("queries-test-32.xml"); -#else - grid = StartNode("queries-test.xml"); -#endif + grid = StartPlatformNode("queries-test.xml", "NodeMain"); testCache = grid.GetCache<int64_t, TestType>("cache"); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/streaming_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/streaming_test.cpp b/modules/platforms/cpp/odbc-test/src/streaming_test.cpp new file mode 100644 index 0000000..6a709ba --- /dev/null +++ b/modules/platforms/cpp/odbc-test/src/streaming_test.cpp @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +# include <windows.h> +#endif + +#include <sql.h> +#include <sqlext.h> + +#include <vector> +#include <string> + +#ifndef _MSC_VER +# define BOOST_TEST_DYN_LINK +#endif + +#include <boost/test/unit_test.hpp> + +#include "ignite/ignite.h" +#include "ignite/ignition.h" +#include "ignite/impl/binary/binary_utils.h" + +#include "test_type.h" +#include "test_utils.h" +#include "odbc_test_suite.h" +#include "ignite/odbc/socket_client.h" +#include <boost/thread/v2/thread.hpp> + +using namespace ignite; +using namespace ignite::common; +using namespace ignite_test; + +using namespace boost::unit_test; + +using ignite::impl::binary::BinaryUtils; + +/** + * Test setup fixture. + */ +struct StreamingTestSuiteFixture : odbc::OdbcTestSuite +{ + /** + * Constructor. + */ + StreamingTestSuiteFixture() : + grid(StartPlatformNode("queries-test.xml", "NodeMain")), + cache(grid.GetCache<int32_t, TestType>("cache")) + { + // No-op. + } + + /** + * Destructor. + */ + ~StreamingTestSuiteFixture() + { + Ignition::StopAll(true); + } + + void InsertTestStrings(int32_t begin, int32_t end) + { + InsertTestStrings(stmt, begin, end); + } + + void InsertTestStrings2(int32_t begin, int32_t end) + { + InsertTestStrings2(stmt, begin, end); + } + + void InsertTestStrings(SQLHSTMT stmt0, int32_t begin, int32_t end) + { + SQLCHAR req[] = "INSERT INTO TestType(_key, strField) VALUES(?, ?)"; + + InsertTestStrings(req, stmt0, begin, end); + } + + void InsertTestStrings2(SQLHSTMT stmt0, int32_t begin, int32_t end) + { + SQLCHAR req[] = "INSERT INTO TestType(_key, i32Field, strField) VALUES(?, 42, ?)"; + + InsertTestStrings(req, stmt0, begin, end); + } + + void InsertTestStrings(SQLCHAR* req, SQLHSTMT stmt0, int32_t begin, int32_t end) + { + SQLRETURN ret = SQLPrepare(stmt0, req, SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + int64_t key = 0; + char strField[1024] = {0}; + SQLLEN strFieldLen = 0; + + // Binding parameters. + ret = SQLBindParameter(stmt0, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0)); + + ret = SQLBindParameter(stmt0, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, sizeof(strField), + sizeof(strField), &strField, sizeof(strField), &strFieldLen); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0)); + + // Inserting values. + for (int32_t i = begin; i < end; ++i) + { + key = i; + std::string val = getTestString(i); + + strncpy(strField, val.c_str(), sizeof(strField)); + strFieldLen = SQL_NTS; + + ret = SQLExecute(stmt0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0)); + } + + // Resetting parameters. + ret = SQLFreeStmt(stmt0, SQL_RESET_PARAMS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt0)); + } + + int32_t GetI32Field(int32_t key) + { + SQLCHAR req[] = "SELECT i32Field FROM TestType WHERE _key = ?"; + + SQLRETURN ret = SQLPrepare(stmt, req, SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + int64_t p1 = key; + + // Binding parameters. + ret = SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p1, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + int32_t val = 0; + + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &val, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + ret = SQLExecute(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Fetching value. + ret = SQLFetch(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Resetting parameters. + ret = SQLFreeStmt(stmt, SQL_RESET_PARAMS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Resetting columns. + ret = SQLFreeStmt(stmt, SQL_UNBIND); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Closing cursor. + ret = SQLFreeStmt(stmt, SQL_CLOSE); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + return val; + } + + void CheckValues(int32_t begin, int32_t end) + { + SQLCHAR req[] = "SELECT _key, strField FROM TestType WHERE _key >= ? AND _key < ? ORDER BY _key"; + + SQLRETURN ret = SQLPrepare(stmt, req, SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + int64_t p1 = begin; + int64_t p2 = end; + + // Binding parameters. + ret = SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p1, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + ret = SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &p2, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + int64_t keyVal = 0; + char strField[1024] = {0}; + SQLLEN strFieldLen = 0; + + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &keyVal, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + ret = SQLBindCol(stmt, 2, SQL_C_CHAR, &strField, sizeof(strField), &strFieldLen); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + ret = SQLExecute(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Fetching values. + for (int32_t i = begin; i < end; ++i) + { + ret = SQLFetch(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(i, keyVal); + BOOST_CHECK_EQUAL(getTestString(i), std::string(strField, static_cast<size_t>(strFieldLen))); + } + + // Resetting parameters. + ret = SQLFreeStmt(stmt, SQL_RESET_PARAMS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Resetting columns. + ret = SQLFreeStmt(stmt, SQL_UNBIND); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + // Closing cursor. + ret = SQLFreeStmt(stmt, SQL_CLOSE); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + } + + /** Node started during the test. */ + Ignite grid; + + /** Cache. */ + cache::Cache<int32_t, TestType> cache; +}; + +BOOST_FIXTURE_TEST_SUITE(StreamingTestSuite, StreamingTestSuiteFixture) + +BOOST_AUTO_TEST_CASE(TestStreamingSimple) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery("set streaming on batch_size 100 flush_frequency 100"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + InsertTestStrings(10, 110); + + res = ExecQuery("set streaming off"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 110); + + CheckValues(0, 110); +} + +BOOST_AUTO_TEST_CASE(TestStreamingAllOptions) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery( + "set streaming 1 " + "allow_overwrite on " + "batch_size 512 " + "per_node_buffer_size 500 " + "per_node_parallel_operations 4 " + "flush_frequency 100 " + "ordered"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + InsertTestStrings(0, 512); + + res = ExecQuery("set streaming off"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 512); +} + +BOOST_AUTO_TEST_CASE(TestStreamingNotAllowedOverwrite) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery("set streaming 1 allow_overwrite off batch_size 10"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + InsertTestStrings(0, 10); + + res = ExecQuery("set streaming off"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 10); +} + +BOOST_AUTO_TEST_CASE(TestStreamingReset) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + InsertTestStrings(10, 20); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = ExecQuery("set streaming 1 batch_size 10 flush_frequency 100"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 20); + + InsertTestStrings(20, 50); + + BOOST_CHECK_EQUAL(cache.Size(), 20); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 50); +} + +BOOST_AUTO_TEST_CASE(TestStreamingClosingStatement) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + SQLFreeHandle(SQL_HANDLE_STMT, stmt); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 10); +} + +BOOST_AUTO_TEST_CASE(TestStreamingSeveralStatements) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLHSTMT stmt2; + + SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + InsertTestStrings(stmt2, 10, 20); + + InsertTestStrings(20, 30); + + InsertTestStrings(stmt2, 30, 50); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 50); + + res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2)); +} + +BOOST_AUTO_TEST_CASE(TestStreamingSeveralStatementsClosing) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLHSTMT stmt2; + + SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, 10); + + InsertTestStrings(stmt2, 10, 20); + + InsertTestStrings(20, 30); + + InsertTestStrings(stmt2, 30, 50); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = SQLFreeHandle(SQL_HANDLE_STMT, stmt); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2)); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 50); +} + +BOOST_AUTO_TEST_CASE(TestStreamingDifferentStatements) +{ + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLHSTMT stmt2; + + SQLRETURN res = SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 1 batch_size 100 flush_frequency 1000"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings2(0, 10); + + InsertTestStrings(stmt2, 10, 20); + + InsertTestStrings(20, 30); + + InsertTestStrings2(stmt2, 30, 50); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = SQLFreeHandle(SQL_HANDLE_STMT, stmt2); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt2)); + + BOOST_CHECK_EQUAL(cache.Size(), 0); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), 50); + + CheckValues(0, 50); + + BOOST_CHECK_EQUAL(GetI32Field(8), 42); + BOOST_CHECK_EQUAL(GetI32Field(13), 0); + BOOST_CHECK_EQUAL(GetI32Field(42), 42); +} + +BOOST_AUTO_TEST_CASE(TestStreamingManyObjects) +{ + const static int32_t OBJECT_NUM = 100000; + + Connect("DRIVER={Apache Ignite};SERVER=127.0.0.1;PORT=11110;SCHEMA=cache"); + + SQLRETURN res = ExecQuery("set streaming on"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + InsertTestStrings(0, OBJECT_NUM); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + res = ExecQuery("set streaming 0"); + + if (res != SQL_SUCCESS) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(cache.Size(), OBJECT_NUM); + + CheckValues(0, OBJECT_NUM); +} + +BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/teamcity/teamcity_boost.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/teamcity/teamcity_boost.cpp b/modules/platforms/cpp/odbc-test/src/teamcity/teamcity_boost.cpp index 9a14fe9..383ada0 100644 --- a/modules/platforms/cpp/odbc-test/src/teamcity/teamcity_boost.cpp +++ b/modules/platforms/cpp/odbc-test/src/teamcity/teamcity_boost.cpp @@ -19,7 +19,7 @@ #include <sstream> -#include <boost/test/unit_test_suite_impl.hpp> +#include <boost/test/unit_test_suite.hpp> #include <boost/test/results_collector.hpp> #include <boost/test/utils/basic_cstring/io.hpp> #include <boost/test/unit_test_log.hpp> http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/test_utils.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/test_utils.cpp b/modules/platforms/cpp/odbc-test/src/test_utils.cpp index 9c1aca9..fc8cbd3 100644 --- a/modules/platforms/cpp/odbc-test/src/test_utils.cpp +++ b/modules/platforms/cpp/odbc-test/src/test_utils.cpp @@ -122,6 +122,19 @@ namespace ignite_test return Ignition::Start(cfg, name); } + ignite::Ignite StartPlatformNode(const char* cfg, const char* name) + { + std::string config(cfg); + +#ifdef IGNITE_TESTS_32 + // Cutting off the ".xml" part. + config.resize(config.size() - 4); + config += "-32.xml"; +#endif //IGNITE_TESTS_32 + + return StartNode(config.c_str(), name); + } + std::string AppendPath(const std::string& base, const std::string& toAdd) { std::stringstream stream; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc-test/src/types_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/types_test.cpp b/modules/platforms/cpp/odbc-test/src/types_test.cpp index d5c7834..00ab8bd 100644 --- a/modules/platforms/cpp/odbc-test/src/types_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/types_test.cpp @@ -53,7 +53,7 @@ struct TypesTestSuiteFixture : odbc::OdbcTestSuite TypesTestSuiteFixture() : cache1(0) { - node = StartTestNode("queries-test.xml", "NodeMain"); + node = StartPlatformNode("queries-test.xml", "NodeMain"); cache1 = node.GetCache<int64_t, TestType>("cache"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/Makefile.am b/modules/platforms/cpp/odbc/Makefile.am index f706f4f..5a8ed6d 100644 --- a/modules/platforms/cpp/odbc/Makefile.am +++ b/modules/platforms/cpp/odbc/Makefile.am @@ -77,9 +77,16 @@ libignite_odbc_la_SOURCES = \ src/query/table_metadata_query.cpp \ src/query/type_info_query.cpp \ src/query/special_columns_query.cpp \ + src/query/streaming_query.cpp \ src/ssl/ssl_gateway.cpp \ src/ssl/secure_socket_client.cpp \ src/ssl/ssl_mode.cpp \ + src/sql/sql_lexer.cpp \ + src/sql/sql_parser.cpp \ + src/sql/sql_set_streaming_command.cpp \ + src/sql/sql_utils.cpp \ + src/streaming/streaming_batch.cpp \ + src/streaming/streaming_context.cpp \ src/protocol_version.cpp \ src/result_page.cpp \ src/row.cpp \ http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/Makefile.am b/modules/platforms/cpp/odbc/include/Makefile.am index 9bdb803..be6e059 100644 --- a/modules/platforms/cpp/odbc/include/Makefile.am +++ b/modules/platforms/cpp/odbc/include/Makefile.am @@ -21,10 +21,12 @@ noinst_HEADERS = \ ignite/odbc.h \ ignite/odbc/query/table_metadata_query.h \ ignite/odbc/query/special_columns_query.h \ + ignite/odbc/query/streaming_query.h \ ignite/odbc/query/type_info_query.h \ ignite/odbc/query/batch_query.h \ ignite/odbc/query/data_query.h \ ignite/odbc/query/foreign_keys_query.h \ + ignite/odbc/query/internal_query.h \ ignite/odbc/query/column_metadata_query.h \ ignite/odbc/query/query.h \ ignite/odbc/query/primary_keys_query.h \ @@ -54,6 +56,14 @@ noinst_HEADERS = \ ignite/odbc/ssl/ssl_bindings.h \ ignite/odbc/ssl/secure_socket_client.h \ ignite/odbc/ssl/ssl_gateway.h \ + ignite/odbc/sql/sql_command.h \ + ignite/odbc/sql/sql_lexer.h \ + ignite/odbc/sql/sql_parser.h \ + ignite/odbc/sql/sql_set_streaming_command.h \ + ignite/odbc/sql/sql_token.h \ + ignite/odbc/sql/sql_utils.h \ + ignite/odbc/streaming/streaming_batch.h \ + ignite/odbc/streaming/streaming_context.h \ ignite/odbc/connection.h \ ignite/odbc/odbc_error.h \ ignite/odbc/message.h \ http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/config/connection_string_parser.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/config/connection_string_parser.h b/modules/platforms/cpp/odbc/include/ignite/odbc/config/connection_string_parser.h index 605109e..1fdedf6 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/config/connection_string_parser.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/config/connection_string_parser.h @@ -119,10 +119,10 @@ namespace ignite * * @param str String to parse. * @param len String length. - * @param delimeter Delimeter. + * @param delimiter delimiter. * @param diag Diagnostics collector. */ - void ParseConnectionString(const char* str, size_t len, char delimeter, + void ParseConnectionString(const char* str, size_t len, char delimiter, diagnostic::DiagnosticRecordStorage* diag); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h index b3ae5fc..27336cf 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/connection.h @@ -27,6 +27,7 @@ #include "ignite/odbc/config/connection_info.h" #include "ignite/odbc/config/configuration.h" #include "ignite/odbc/diagnostic/diagnosable_adapter.h" +#include "ignite/odbc/streaming/streaming_context.h" #include "ignite/odbc/odbc_error.h" namespace ignite @@ -113,6 +114,20 @@ namespace ignite /** * Send data by established connection. + * Uses connection timeout. + * + * @param data Data buffer. + * @param len Data length. + * @return @c true on success, @c false on timeout. + * @throw OdbcError on error. + */ + bool Send(const int8_t* data, size_t len) + { + return Send(data, len, timeout); + } + + /** + * Send data by established connection. * * @param data Data buffer. * @param len Data length. @@ -151,7 +166,17 @@ namespace ignite * * @return @c true if the auto commit is enabled. */ - bool IsAutoCommit(); + bool IsAutoCommit() const; + + /** + * Get streaming context. + * + * @return Streaming context. + */ + streaming::StreamingContext& GetStreamingContext() + { + return streamingContext; + } /** * Create diagnostic record associated with the Connection instance. @@ -230,6 +255,28 @@ namespace ignite } /** + * Send request message. + * Uses connection timeout. + * + * @param req Request message. + * @throw OdbcError on error. + */ + template<typename ReqT> + void SendRequest(const ReqT& req) + { + EnsureConnected(); + + std::vector<int8_t> tempBuffer; + + parser.Encode(req, tempBuffer); + + bool success = Send(tempBuffer.data(), tempBuffer.size(), timeout); + + if (!success) + throw OdbcError(SqlState::SHYT01_CONNECTION_TIMEOUT, "Send operation timed out"); + } + + /** * Perform transaction commit. */ void TransactionCommit(); @@ -470,6 +517,9 @@ namespace ignite /** Connection info. */ config::ConnectionInfo info; + + /** Streaming context. */ + streaming::StreamingContext streamingContext; }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/message.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h index 87ba064..79e78c3 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h @@ -35,6 +35,12 @@ namespace ignite { namespace odbc { + namespace streaming + { + // Forward declaration. + class StreamingBatch; + } + struct ClientType { enum Type @@ -63,7 +69,9 @@ namespace ignite EXECUTE_SQL_QUERY_BATCH = 8, - QUERY_MORE_RESULTS = 9 + QUERY_MORE_RESULTS = 9, + + STREAMING_BATCH = 10 }; }; @@ -146,7 +154,7 @@ namespace ignite /** * Query execute batch request. */ - class QueryExecuteBatchtRequest + class QueryExecuteBatchRequest { public: /** @@ -155,19 +163,19 @@ namespace ignite * @param schema Schema. * @param sql SQL query. * @param params Query arguments. - * @param begin Beginng of the interval. + * @param begin Beginning of the interval. * @param end End of the interval. * @param timeout Timeout. * @param autoCommit Auto commit flag. */ - QueryExecuteBatchtRequest(const std::string& schema, const std::string& sql, + QueryExecuteBatchRequest(const std::string& schema, const std::string& sql, const app::ParameterSet& params, SqlUlen begin, SqlUlen end, bool last, int32_t timeout, bool autoCommit); /** * Destructor. */ - ~QueryExecuteBatchtRequest(); + ~QueryExecuteBatchRequest(); /** * Write request using provided writer. @@ -186,7 +194,7 @@ namespace ignite /** Parameters bindings. */ const app::ParameterSet& params; - /** Beginng of the interval. */ + /** Beginning of the interval. */ SqlUlen begin; /** End of the interval. */ @@ -426,6 +434,49 @@ namespace ignite }; /** + * Streaming batch request. + */ + class StreamingBatchRequest + { + public: + /** + * Constructor. + * + * @param schema Schema. + * @param batch Batch. + * @param last Last batch indicator. + * @param order Order. + */ + StreamingBatchRequest(const std::string& schema, const streaming::StreamingBatch& batch, + bool last, int64_t order); + + /** + * Destructor. + */ + ~StreamingBatchRequest(); + + /** + * Write request using provided writer. + * @param writer Writer. + */ + void Write(impl::binary::BinaryWriterImpl& writer, const ProtocolVersion&) const; + + private: + /** Schema name. */ + std::string schema; + + /** Batch. */ + const streaming::StreamingBatch& batch; + + /** Last page flag. */ + bool last; + + /** Order. */ + int64_t order; + }; + + + /** * General response. */ class Response @@ -698,6 +749,67 @@ namespace ignite }; /** + * Streaming batch response. + */ + class StreamingBatchResponse : public Response + { + public: + /** + * Constructor. + */ + StreamingBatchResponse(); + + /** + * Destructor. + */ + virtual ~StreamingBatchResponse(); + + /** + * Get error message. + * @return Error message. + */ + const std::string& GetErrorMessage() const + { + return errorMessage; + } + + /** + * Get error code. + * @return Error code. + */ + int32_t GetErrorCode() const + { + return errorCode; + } + + /** + * Get order. + * @return Order. + */ + int64_t GetOrder() const + { + return order; + } + + private: + /** + * Read response using provided reader. + * @param reader Reader. + * @param ver Protocol version. + */ + virtual void ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& ver); + + /** Error message. */ + std::string errorMessage; + + /** Error code. */ + int32_t errorCode; + + /** Order. */ + int64_t order; + }; + + /** * Query fetch response. */ class QueryFetchResponse : public Response http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/odbc_error.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/odbc_error.h b/modules/platforms/cpp/odbc/include/ignite/odbc/odbc_error.h index 361b2b5..9b3b48d 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/odbc_error.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/odbc_error.h @@ -20,7 +20,8 @@ #include <string> -#include "ignite/odbc/common_types.h" +#include <ignite/odbc/common_types.h> +#include <ignite/common/expected.h> namespace ignite { @@ -56,6 +57,18 @@ namespace ignite } /** + * Copy constructor. + * + * @param other Other instance. + */ + OdbcError(const OdbcError& other) : + status(other.status), + errMessage(other.errMessage) + { + // No-op. + } + + /** * Destructor. */ ~OdbcError() @@ -88,6 +101,27 @@ namespace ignite /** Error message. */ std::string errMessage; }; + + typedef common::Unexpected<OdbcError> OdbcUnexpected; + + /** + * Expected specialization for OdbcError. + */ + template<typename R> + struct OdbcExpected : common::Expected<R, OdbcError> + { + OdbcExpected(const R& res) + : common::Expected<R, OdbcError>(res) + { + // No-op. + } + + OdbcExpected(const OdbcError& err) + : common::Expected<R, OdbcError>(OdbcUnexpected(err)) + { + // No-op. + } + }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h index d742490..354393e 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h @@ -108,7 +108,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h index 9abd8b2..81e8093 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h @@ -109,7 +109,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/internal_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/internal_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/internal_query.h new file mode 100644 index 0000000..67fbd1a --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/internal_query.h @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_QUERY_INTERNAL_QUERY +#define _IGNITE_ODBC_QUERY_INTERNAL_QUERY + +#include <stdint.h> + +#include <map> + +#include "ignite/odbc/diagnostic/diagnosable.h" +#include "ignite/odbc/meta/column_meta.h" +#include "ignite/odbc/common_types.h" +#include "ignite/odbc/query/query.h" +#include "ignite/odbc/sql/sql_command.h" + +namespace ignite +{ + namespace odbc + { + namespace query + { + /** + * Query. + */ + class InternalQuery : public Query + { + public: + /** + * Constructor. + * + * @param diag Diagnosable. + * @param sql SQL query. + * @param cmd Parsed command. + */ + InternalQuery(diagnostic::Diagnosable& diag, const std::string& sql, std::auto_ptr<SqlCommand> cmd) : + Query(diag, QueryType::INTERNAL), + sql(sql), + cmd(cmd) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~InternalQuery() + { + // No-op. + } + + /** + * Execute query. + * + * @return True on success. + */ + virtual SqlResult::Type Execute() + { + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, "Internal error."); + + return SqlResult::AI_ERROR; + } + + /** + * Fetch next result row to application buffers. + * + * @param columnBindings Application buffers to put data to. + * @return Operation result. + */ + virtual SqlResult::Type FetchNextRow(app::ColumnBindingMap& columnBindings) + { + (void) columnBindings; + + return SqlResult::AI_NO_DATA; + } + + /** + * Get data of the specified column in the result set. + * + * @param columnIdx Column index. + * @param buffer Buffer to put column data to. + * @return Operation result. + */ + virtual SqlResult::Type GetColumn(uint16_t columnIdx, app::ApplicationDataBuffer& buffer) + { + (void) columnIdx; + (void) buffer; + + return SqlResult::AI_NO_DATA; + } + + /** + * Close query. + * + * @return Operation result. + */ + virtual SqlResult::Type Close() + { + return SqlResult::AI_SUCCESS; + } + + /** + * Get column metadata. + * + * @return Column metadata. + */ + virtual const meta::ColumnMetaVector& GetMeta() const + { + static const meta::ColumnMetaVector empty; + + return empty; + } + + /** + * Check if data is available. + * + * @return True if data is available. + */ + virtual bool DataAvailable() const + { + return false; + } + + /** + * Get number of rows affected by the statement. + * + * @return Number of rows affected by the statement. + */ + virtual int64_t AffectedRows() const + { + return 0; + } + + /** + * Move to the next result set. + * + * @return Operation result. + */ + virtual SqlResult::Type NextResultSet() + { + return SqlResult::AI_NO_DATA; + } + + /** + * Get SQL query. + * + * @return SQL query. + */ + SqlCommand& GetCommand() const + { + return *cmd; + } + + /** + * Get SQL query. + * + * @return SQL Query. + */ + const std::string& GetQuery() const + { + return sql; + } + + protected: + /** SQL string*/ + std::string sql; + + /** SQL command. */ + std::auto_ptr<SqlCommand> cmd; + }; + } + } +} + +#endif //_IGNITE_ODBC_QUERY_INTERNAL_QUERY http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h index 42f7e26..3650fcf 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h @@ -106,7 +106,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h index 9d54b90..22503a1 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h @@ -47,6 +47,9 @@ namespace ignite /** Batch query type. */ BATCH, + /** Streaming query type. */ + STREAMING, + /** Foreign keys query type. */ FOREIGN_KEYS, @@ -60,7 +63,10 @@ namespace ignite TABLE_METADATA, /** Type info query type. */ - TYPE_INFO + TYPE_INFO, + + /** Internal query, that should be parsed by a driver itself. */ + INTERNAL }; }; @@ -133,7 +139,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet() = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h index d6a5c44..919febf 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h @@ -108,7 +108,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/streaming_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/streaming_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/streaming_query.h new file mode 100644 index 0000000..cf87e80 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/streaming_query.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_QUERY_STREAMING_QUERY +#define _IGNITE_ODBC_QUERY_STREAMING_QUERY + +#include "ignite/odbc/query/query.h" +#include "ignite/odbc/app/parameter_set.h" + +namespace ignite +{ + namespace odbc + { + /** Connection forward-declaration. */ + class Connection; + + namespace query + { + /** + * Streaming Query. + */ + class StreamingQuery : public Query + { + public: + /** + * Constructor. + * + * @param diag Diagnostics collector. + * @param connection Associated connection. + * @param params SQL params. + */ + StreamingQuery( + diagnostic::Diagnosable& diag, + Connection& connection, + const app::ParameterSet& params); + + /** + * Destructor. + */ + virtual ~StreamingQuery(); + + /** + * Execute query. + * + * @return True on success. + */ + virtual SqlResult::Type Execute(); + + /** + * Get column metadata. + * + * @return Column metadata. + */ + virtual const meta::ColumnMetaVector& GetMeta() const; + + /** + * Fetch next result row to application buffers. + * + * @param columnBindings Application buffers to put data to. + * @return Operation result. + */ + virtual SqlResult::Type FetchNextRow(app::ColumnBindingMap& columnBindings); + + /** + * Get data of the specified column in the result set. + * + * @param columnIdx Column index. + * @param buffer Buffer to put column data to. + * @return Operation result. + */ + virtual SqlResult::Type GetColumn(uint16_t columnIdx, app::ApplicationDataBuffer& buffer); + + /** + * Close query. + * + * @return Result. + */ + virtual SqlResult::Type Close(); + + /** + * Check if data is available. + * + * @return True if data is available. + */ + virtual bool DataAvailable() const; + + /** + * Get number of rows affected by the statement. + * + * @return Number of rows affected by the statement. + */ + virtual int64_t AffectedRows() const; + + /** + * Move to the next result set. + * + * @return Operation result. + */ + virtual SqlResult::Type NextResultSet(); + + /** + * Get SQL query string. + * + * @return SQL query string. + */ + const std::string& GetSql() const + { + return sql; + } + + /** + * Prepare query for execution in a streaming mode. + * + * @param query Query. + */ + void PrepareQuery(const std::string& query) + { + sql = query; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(StreamingQuery); + + /** Connection associated with the statement. */ + Connection& connection; + + /** SQL Query. */ + std::string sql; + + /** Parameter bindings. */ + const app::ParameterSet& params; + }; + } + } +} + +#endif //_IGNITE_ODBC_QUERY_STREAMING_QUERY http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h index 759bfd6..776b747 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h @@ -109,7 +109,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h index 974ee01..3f2e76c 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h @@ -99,7 +99,7 @@ namespace ignite /** * Move to the next result set. * - * @return Operatoin result. + * @return Operation result. */ virtual SqlResult::Type NextResultSet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_command.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_command.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_command.h new file mode 100644 index 0000000..28d4c39 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_command.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_COMMAND +#define _IGNITE_ODBC_SQL_SQL_COMMAND + +namespace ignite +{ + namespace odbc + { + class SqlLexer; + + /** + * SQL command type. + */ + struct SqlCommandType + { + enum Type + { + SET_STREAMING + }; + }; + + /** + * SQL command. + */ + class SqlCommand + { + public: + /** + * Constructor. + * + * @param typ Type. + */ + SqlCommand(SqlCommandType::Type typ) : + typ(typ) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~SqlCommand() + { + // No-op. + } + + /** + * Get type. + * + * @return Type. + */ + SqlCommandType::Type GetType() const + { + return typ; + } + + /** + * Parse from lexer. + * + * @param lexer Lexer. + */ + virtual void Parse(SqlLexer& lexer) = 0; + + protected: + /** Type. */ + SqlCommandType::Type typ; + }; + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_COMMAND \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_lexer.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_lexer.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_lexer.h new file mode 100644 index 0000000..fb57173 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_lexer.h @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_LEXER +#define _IGNITE_ODBC_SQL_SQL_LEXER + +#include <stdint.h> +#include <string> + +#include <ignite/odbc/odbc_error.h> +#include <ignite/odbc/sql/sql_token.h> + +namespace ignite +{ + namespace odbc + { + /** + * SQL lexer. + */ + class SqlLexer + { + public: + /** + * Constructor. + * + * @param sql SQL string. + */ + SqlLexer(const std::string& sql); + + /** + * Destructor. + */ + ~SqlLexer(); + + /** + * Move to the next token. + * + * @return @c true if next token was found and @c false otherwise. + */ + OdbcExpected<bool> Shift(); + + /** + * Check that the following token is the expected one. + * Shifts to next token if possible. + * + * @param typ Token type. + * @param expected Expected token. Should be lowercase. + * @return @c true if the next token is expected and @c false otherwise. + */ + bool ExpectNextToken(TokenType::Type typ, const char* expected); + + /** + * Check if the end of data reached. + * + * @return @c true if the end of data reached. + */ + bool IsEod() const; + + /** + * Get current token. + * + * @return Current token. + */ + const SqlToken& GetCurrentToken() const + { + return currentToken; + } + + private: + /** + * Set end of data state. + */ + void SetEod(); + + /** + * Have enough data. + * + * @param num Number of chars we need. + * @return @c true if we have and false otherwise. + */ + bool HaveData(int32_t num) const; + + /** + * Check if the char is delimiter. + * + * @param c Character + * @return True if the character is delimiter. + */ + static bool IsDelimiter(int c); + + /** SQL string. */ + const std::string& sql; + + /** Current lexer position in string. */ + int32_t pos; + + /** Current token. */ + SqlToken currentToken; + }; + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_LEXER \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_parser.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_parser.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_parser.h new file mode 100644 index 0000000..543d36c --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_parser.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_PARSER +#define _IGNITE_ODBC_SQL_SQL_PARSER + +#include <string> +#include <memory> + +#include <ignite/odbc/sql/sql_lexer.h> +#include <ignite/odbc/sql/sql_command.h> + +namespace ignite +{ + namespace odbc + { + /** + * SQL parser. + */ + class SqlParser + { + public: + /** + * Default constructor. + * + * @param sql SQL request. + */ + SqlParser(const std::string& sql); + + /** + * Destructor. + */ + ~SqlParser(); + + /** + * Get next command. + * + * @return Parsed command on success and null on failure. + */ + std::auto_ptr<SqlCommand> GetNextCommand(); + + private: + /** + * + */ + std::auto_ptr<SqlCommand> ProcessCommand(); + + /** SQL lexer. */ + SqlLexer lexer; + }; + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_PARSER \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_set_streaming_command.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_set_streaming_command.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_set_streaming_command.h new file mode 100644 index 0000000..02cb0a5 --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_set_streaming_command.h @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_SET_STREAMING_COMMAND +#define _IGNITE_ODBC_SQL_SQL_SET_STREAMING_COMMAND + +#include <stdint.h> +#include <string> + +#include <ignite/odbc/sql/sql_command.h> + +namespace ignite +{ + namespace odbc + { + class SqlToken; + + /** + * SQL set streaming command. + */ + class SqlSetStreamingCommand : public SqlCommand + { + /** Default batch size for driver. */ + enum { DEFAULT_STREAM_BATCH_SIZE = 2048 }; + + public: + /** + * Default constructor. + */ + SqlSetStreamingCommand(); + + /** + * Destructor. + */ + virtual ~SqlSetStreamingCommand(); + + /** + * Parse from lexer. + * + * @param lexer Lexer. + */ + virtual void Parse(SqlLexer& lexer); + + /** + * Check if the streaming enabled. + * + * @return @c true if enabled. + */ + bool IsEnabled() const + { + return enabled; + } + + /** + * Check if the overwrite is allowed. + * + * @return @c true if allowed. + */ + bool IsAllowOverwrite() const + { + return allowOverwrite; + } + + /** + * Get batch size. + * + * @return Batch size. + */ + int32_t GetBatchSize() const + { + return batchSize; + } + + /** + * Get parallel operations per node. + * + * @return Parallel operations per node. + */ + int32_t GetParallelOperationsPerNode() const + { + return parallelOpsPerNode; + } + + /** + * Get buffer size per node. + * + * @return Buffer size per node. + */ + int32_t GetBufferSizePerNode() const + { + return bufferSizePerNode; + } + + /** + * Get flush frequency. + * + * @return Flush frequency. + */ + int64_t GetFlushFrequency() const + { + return flushFrequency; + } + + /** + * Check if the streaming is ordered. + * + * @return @c true if ordered. + */ + bool IsOrdered() const + { + return ordered; + } + + private: + /** + * Check that the streaming mode is enabled. + */ + void CheckEnabled(const SqlToken& token) const; + + /** + * Throw exception, showing that token is unexpected. + * + * @param token Token. + * @param expected Expected details. + */ + static void ThrowUnexpectedTokenError(const SqlToken& token, const std::string& expected); + + /** + * Throw exception, showing that token is unexpected. + * + * @param expected Expected details. + */ + static void ThrowUnexpectedEndOfStatement(const std::string& expected); + + /** + * Get int or throw parsing exception. + * + * @param lexer Lexer to use. + * @return Integer number. + */ + static int32_t ExpectInt(SqlLexer& lexer); + + /** + * Get positive int or throw parsing exception. + * + * @param lexer Lexer to use. + * @param description Param description to use in exception on error. + * @return Integer number. + */ + static int32_t ExpectPositiveInteger(SqlLexer& lexer, const std::string& description); + + /** + * Get bool or throw parsing exception. + * + * @param lexer Lexer to use. + * @return Boolean value. + */ + static bool ExpectBool(SqlLexer& lexer); + + /** Whether streaming must be turned on or off by this command. */ + bool enabled; + + /** Whether existing values should be overwritten on keys duplication. */ + bool allowOverwrite; + + /** Batch size for driver. */ + int32_t batchSize; + + /** Per node number of parallel operations. */ + int32_t parallelOpsPerNode; + + /** Per node buffer size. */ + int32_t bufferSizePerNode; + + /** Streamer flush timeout. */ + int64_t flushFrequency; + + /** Ordered streamer. */ + bool ordered; + }; + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_SET_STREAMING_COMMAND \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9f8d331d/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_token.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_token.h b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_token.h new file mode 100644 index 0000000..5db709f --- /dev/null +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/sql/sql_token.h @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_ODBC_SQL_SQL_TOKEN +#define _IGNITE_ODBC_SQL_SQL_TOKEN + +#include <stdint.h> + +#include <ignite/common/utils.h> + +namespace ignite +{ + namespace odbc + { + /** + * Token type. + */ + struct TokenType + { + enum Type + { + /** Minus token. */ + MINUS, + + /** Quoted token. */ + QUOTED, + + /** String literal token. */ + STRING, + + /** Dot. */ + DOT, + + /** Comma. */ + COMMA, + + /** Parenthesis: left. */ + PARENTHESIS_LEFT, + + /** Parenthesis: right. */ + PARENTHESIS_RIGHT, + + /** Semicolon. */ + SEMICOLON, + + /** Simple word. */ + WORD, + + /** End of data. */ + EOD + }; + }; + + /** + * SQL token. + */ + class SqlToken + { + public: + /** + * Constructor. + * + * @param token Token begin pointer. + * @param size Token size in characters. + * @param typ Token type. + */ + SqlToken(const char* token, int32_t size, TokenType::Type typ) : + token(token), + size(size), + typ(typ) + { + // No-op. + } + + /** + * Get type. + * + * @return Current token type. + */ + TokenType::Type GetType() const + { + return typ; + } + + /** + * Get token value. + * + * @return Pointer to the beginning of the value. Size of the value can be obtained with @c GetSize(). + */ + const char* GetValue() const + { + return token; + } + + /** + * Get size. + * + * @return Size. + */ + int32_t GetSize() const + { + return size; + } + + /** + * Convert to string. + * + * @return String token. + */ + std::string ToString() const + { + if (!token || size <= 0) + return std::string(); + + return std::string(token, static_cast<size_t>(size)); + } + + /** + * Convert to lowercase string. + * + * @return Lowercase string token. + */ + std::string ToLower() const + { + std::string str(ToString()); + + common::IntoLower(str); + + return str; + } + + private: + /** Current token begin. */ + const char* token; + + /** Current token size. */ + int32_t size; + + /** Current token type. */ + TokenType::Type typ; + }; + } +} + +#endif //_IGNITE_ODBC_SQL_SQL_TOKEN \ No newline at end of file