Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 071ac1e98 -> 41a5e62d8
MINIFI-269: Add Site2Site Test case This closes #81. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/41a5e62d Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/41a5e62d Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/41a5e62d Branch: refs/heads/master Commit: 41a5e62d877d422462a75238e6e5d148890ef2a1 Parents: 071ac1e Author: Bin Qiu <benqiu2...@gmail.com> Authored: Mon Apr 24 15:07:25 2017 -0700 Committer: Aldrin Piri <ald...@apache.org> Committed: Fri Apr 28 14:33:58 2017 -0400 ---------------------------------------------------------------------- libminifi/src/core/yaml/YamlConfiguration.cpp | 2 +- libminifi/test/unit/SerializationTests.cpp | 89 ++------- libminifi/test/unit/Site2SiteTests.cpp | 202 +++++++++++++++++++++ libminifi/test/unit/SiteToSiteHelper.h | 156 ++++++++++++++++ 4 files changed, 372 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index d76b9f3..5484e36 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -339,7 +339,7 @@ void YamlConfiguration::parseProvenanceReportingYaml( return; } - if (!reportNode || !(reportNode->IsSequence())) { + if (!reportNode || !reportNode->IsDefined() || reportNode->IsNull()) { logger_->log_debug("no provenance reporting task specified"); return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/SerializationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp index c6ada33..0841fbd 100644 --- a/libminifi/test/unit/SerializationTests.cpp +++ b/libminifi/test/unit/SerializationTests.cpp @@ -16,69 +16,21 @@ * limitations under the License. */ - #include "io/BaseStream.h" #include "Site2SitePeer.h" #include "Site2SiteClientProtocol.h" #include <uuid/uuid.h> +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "SiteToSiteHelper.h" #include <algorithm> #include <string> #include <memory> #include "../TestBase.h" #define FMT_DEFAULT fmt_lower - using namespace org::apache::nifi::minifi::io; -TEST_CASE("TestSetPortId", "[S2S1]"){ - - - std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<minifi::Site2SitePeer>( new minifi::Site2SitePeer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433)); - - minifi::Site2SiteClientProtocol protocol(std::move(peer)); - - - std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538"; - - uuid_t fakeUUID; - - uuid_parse(uuid_str.c_str(),fakeUUID); - - protocol.setPortId(fakeUUID); - - REQUIRE( uuid_str == protocol.getPortId() ); - - - -} - -TEST_CASE("TestSetPortIdUppercase", "[S2S2]"){ - - - std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<minifi::Site2SitePeer>( new minifi::Site2SitePeer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433)); - - minifi::Site2SiteClientProtocol protocol(std::move(peer)); - - - std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; - - uuid_t fakeUUID; - - uuid_parse(uuid_str.c_str(),fakeUUID); - - protocol.setPortId(fakeUUID); - - REQUIRE( uuid_str != protocol.getPortId() ); - - std::transform(uuid_str.begin(),uuid_str.end(),uuid_str.begin(),::tolower); - - REQUIRE( uuid_str == protocol.getPortId() ); - - - -} - - -TEST_CASE("TestWriteUTF", "[MINIFI193]"){ +TEST_CASE("TestWriteUTF", "[MINIFI193]") { DataStream baseStream; @@ -86,22 +38,15 @@ TEST_CASE("TestWriteUTF", "[MINIFI193]"){ std::string stringOne = "helo world"; // yes, this has a typo. std::string verifyString; - ser.writeUTF(stringOne,&baseStream,false); - + ser.writeUTF(stringOne, &baseStream, false); - ser.readUTF(verifyString,&baseStream,false); + ser.readUTF(verifyString, &baseStream, false); REQUIRE(verifyString == stringOne); - - - } - - - -TEST_CASE("TestWriteUTF2", "[MINIFI193]"){ +TEST_CASE("TestWriteUTF2", "[MINIFI193]") { DataStream baseStream; @@ -110,20 +55,15 @@ TEST_CASE("TestWriteUTF2", "[MINIFI193]"){ std::string stringOne = "hel\xa1o world"; REQUIRE(11 == stringOne.length()); std::string verifyString; - ser.writeUTF(stringOne,&baseStream,false); - + ser.writeUTF(stringOne, &baseStream, false); - ser.readUTF(verifyString,&baseStream,false); + ser.readUTF(verifyString, &baseStream, false); REQUIRE(verifyString == stringOne); - - - } - -TEST_CASE("TestWriteUTF3", "[MINIFI193]"){ +TEST_CASE("TestWriteUTF3", "[MINIFI193]") { DataStream baseStream; @@ -132,14 +72,11 @@ TEST_CASE("TestWriteUTF3", "[MINIFI193]"){ std::string stringOne = "\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c"; REQUIRE(12 == stringOne.length()); std::string verifyString; - ser.writeUTF(stringOne,&baseStream,false); - + ser.writeUTF(stringOne, &baseStream, false); - ser.readUTF(verifyString,&baseStream,false); + ser.readUTF(verifyString, &baseStream, false); REQUIRE(verifyString == stringOne); - - - } + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/Site2SiteTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp new file mode 100644 index 0000000..a67881d --- /dev/null +++ b/libminifi/test/unit/Site2SiteTests.cpp @@ -0,0 +1,202 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/BaseStream.h" +#include "Site2SitePeer.h" +#include "Site2SiteClientProtocol.h" +#include <uuid/uuid.h> +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "SiteToSiteHelper.h" +#include <algorithm> +#include <string> +#include <memory> +#include "../TestBase.h" +#define FMT_DEFAULT fmt_lower + +using namespace org::apache::nifi::minifi::io; +TEST_CASE("TestSetPortId", "[S2S1]") { + + std::unique_ptr<minifi::Site2SitePeer> peer = + std::unique_ptr < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < DataStream > (new DataStream()), "fake_host", + 65433)); + + minifi::Site2SiteClientProtocol protocol(std::move(peer)); + + std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538"; + + uuid_t fakeUUID; + + uuid_parse(uuid_str.c_str(), fakeUUID); + + protocol.setPortId(fakeUUID); + + REQUIRE(uuid_str == protocol.getPortId()); + +} + +TEST_CASE("TestSetPortIdUppercase", "[S2S2]") { + + std::unique_ptr<minifi::Site2SitePeer> peer = + std::unique_ptr < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < DataStream > (new DataStream()), "fake_host", + 65433)); + + minifi::Site2SiteClientProtocol protocol(std::move(peer)); + + std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; + + uuid_t fakeUUID; + + uuid_parse(uuid_str.c_str(), fakeUUID); + + protocol.setPortId(fakeUUID); + + REQUIRE(uuid_str != protocol.getPortId()); + + std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), ::tolower); + + REQUIRE(uuid_str == protocol.getPortId()); + +} + +void sunny_path_bootstrap(SiteToSiteResponder *collector) { + + char a = 0x14; // RESOURCE_OK + std::string resp_code; + resp_code.insert(resp_code.begin(), a); + collector->push_response(resp_code); + + // Handshake respond code + resp_code = "R"; + collector->push_response(resp_code); + resp_code = "C"; + collector->push_response(resp_code); + char b = 0x1; + resp_code = b; + collector->push_response(resp_code); + + // Codec Negotiation + resp_code = a; + collector->push_response(resp_code); +} + +TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") { + + SiteToSiteResponder *collector = new SiteToSiteResponder(); + + sunny_path_bootstrap(collector); + + std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr + < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < minifi::io::DataStream > (collector), "fake_host", + 65433)); + + minifi::Site2SiteClientProtocol protocol(std::move(peer)); + + std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; + + uuid_t fakeUUID; + + uuid_parse(uuid_str.c_str(), fakeUUID); + + protocol.setPortId(fakeUUID); + + REQUIRE(true == protocol.bootstrap()); + + REQUIRE(collector->get_next_client_response() == "NiFi"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "SocketFlowFileProtocol"); + collector->get_next_client_response(); + collector->get_next_client_response(); + collector->get_next_client_response(); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "nifi://fake_host:65433"); + collector->get_next_client_response(); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "GZIP"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "false"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "PORT_IDENTIFIER"); + collector->get_next_client_response(); + REQUIRE( + collector->get_next_client_response() + == "c56a4180-65aa-42ec-a945-5fd21dec0538"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "REQUEST_EXPIRATION_MILLIS"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "30000"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC"); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec"); + collector->get_next_client_response(); // codec version + + // start to send the stuff + // Create the transaction + std::string transactionID; + std::string payload = "Test MiNiFi payload"; + minifi::Transaction *transaction; + transaction = protocol.createTransaction(transactionID, minifi::SEND); + collector->get_next_client_response(); + REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES"); + std::map < std::string, std::string > attributes; + minifi::DataPacket packet(&protocol, transaction, attributes, payload); + REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == true); + collector->get_next_client_response(); + collector->get_next_client_response(); + std::string rx_payload = collector->get_next_client_response(); + ; + REQUIRE(payload == rx_payload); + +} + +TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") { + + SiteToSiteResponder *collector = new SiteToSiteResponder(); + + char a = 0xFF; + std::string resp_code; + resp_code.insert(resp_code.begin(), a); + collector->push_response(resp_code); + collector->push_response(resp_code); + + std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr + < minifi::Site2SitePeer + > (new minifi::Site2SitePeer( + std::unique_ptr < minifi::io::DataStream > (collector), "fake_host", + 65433)); + + minifi::Site2SiteClientProtocol protocol(std::move(peer)); + + std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; + + uuid_t fakeUUID; + + uuid_parse(uuid_str.c_str(), fakeUUID); + + protocol.setPortId(fakeUUID); + + REQUIRE(false == protocol.bootstrap()); + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/SiteToSiteHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h new file mode 100755 index 0000000..36a7000 --- /dev/null +++ b/libminifi/test/unit/SiteToSiteHelper.h @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_ +#define LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_ + +#include <queue> +#include "io/BaseStream.h" +#include "io/EndianCheck.h" +#include "core/Core.h" +/** + * Test repository + */ +class SiteToSiteResponder: public minifi::io::BaseStream { +private: + std::queue<std::string> server_responses_; + std::queue<std::string> client_responses_; +public: + SiteToSiteResponder() { + } + // initialize + virtual short initialize() { + return 1; + } + + void push_response(std::string resp) { + server_responses_.push(resp); + } + + std::string get_next_response() { + std::string ret = server_responses_.front(); + server_responses_.pop(); + return ret; + } + + int writeData(uint8_t *value, int size) { + client_responses_.push(std::string((char*) value, size)); + return size; + } + + bool has_next_client_response() { + return !client_responses_.empty(); + } + + std::string get_next_client_response() { + std::string ret = client_responses_.front(); + client_responses_.pop(); + return ret; + } + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint8_t &value) { + value = get_next_response().c_str()[0]; + return 1; + } + + /** + * reads two bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint16_t &base_value, bool is_little_endian = + minifi::io::EndiannessCheck::IS_LITTLE) { + base_value = std::stoi(get_next_response()); + return 2; + } + + /** + * reads a byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(char &value) { + value = get_next_response().c_str()[0]; + return 1; + } + + /** + * reads a byte array from the stream + * @param value reference in which will set the result + * @param len length to read + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint8_t *value, int len) { + std::string str = get_next_response(); + memcpy(value, str.c_str(), str.size()); + return len; + } + + virtual int readData(uint8_t *buf, int buflen) { + std::string str = get_next_response(); + memset(buf, 0x00, buflen); + memcpy(buf, str.c_str(), str.size()); + return str.size(); + } + + /** + * reads four bytes from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint32_t &value, bool is_little_endian = + minifi::io::EndiannessCheck::IS_LITTLE) { + value = std::stoul(get_next_response()); + return 4; + } + + /** + * reads eight byte from the stream + * @param value reference in which will set the result + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int read(uint64_t &value, bool is_little_endian = + minifi::io::EndiannessCheck::IS_LITTLE) { + value = std::stoull(get_next_response()); + return 4; + } + + /** + * read UTF from stream + * @param str reference string + * @param stream stream from which we will read + * @return resulting read size + **/ + virtual int readUTF(std::string &str, bool widen = false) { + str = get_next_response(); + return str.length(); + } + +}; + +#endif /* LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_ */