Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 071ac1e98 -> 41a5e62d8


MINIFI-269: Add Site2Site Test case

This closes #81.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/41a5e62d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/41a5e62d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/41a5e62d

Branch: refs/heads/master
Commit: 41a5e62d877d422462a75238e6e5d148890ef2a1
Parents: 071ac1e
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Mon Apr 24 15:07:25 2017 -0700
Committer: Aldrin Piri <ald...@apache.org>
Committed: Fri Apr 28 14:33:58 2017 -0400

----------------------------------------------------------------------
 libminifi/src/core/yaml/YamlConfiguration.cpp |   2 +-
 libminifi/test/unit/SerializationTests.cpp    |  89 ++-------
 libminifi/test/unit/Site2SiteTests.cpp        | 202 +++++++++++++++++++++
 libminifi/test/unit/SiteToSiteHelper.h        | 156 ++++++++++++++++
 4 files changed, 372 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index d76b9f3..5484e36 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -339,7 +339,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(
     return;
   }
 
-  if (!reportNode || !(reportNode->IsSequence())) {
+  if (!reportNode || !reportNode->IsDefined() || reportNode->IsNull()) {
     logger_->log_debug("no provenance reporting task specified");
     return;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/SerializationTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SerializationTests.cpp 
b/libminifi/test/unit/SerializationTests.cpp
index c6ada33..0841fbd 100644
--- a/libminifi/test/unit/SerializationTests.cpp
+++ b/libminifi/test/unit/SerializationTests.cpp
@@ -16,69 +16,21 @@
  * limitations under the License.
  */
 
-
 #include "io/BaseStream.h"
 #include "Site2SitePeer.h"
 #include "Site2SiteClientProtocol.h"
 #include <uuid/uuid.h>
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "SiteToSiteHelper.h"
 #include <algorithm>
 #include <string>
 #include <memory>
 #include "../TestBase.h"
 #define FMT_DEFAULT fmt_lower
 
-
 using namespace org::apache::nifi::minifi::io;
-TEST_CASE("TestSetPortId", "[S2S1]"){
-
-
-       std::unique_ptr<minifi::Site2SitePeer> peer = 
std::unique_ptr<minifi::Site2SitePeer>( new 
minifi::Site2SitePeer(std::unique_ptr<DataStream>(new 
DataStream()),"fake_host",65433));
-
-       minifi::Site2SiteClientProtocol protocol(std::move(peer));
-
-
-       std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538";
-
-       uuid_t fakeUUID;
-
-       uuid_parse(uuid_str.c_str(),fakeUUID);
-
-       protocol.setPortId(fakeUUID);
-
-       REQUIRE( uuid_str == protocol.getPortId() );
-
-
-
-}
-
-TEST_CASE("TestSetPortIdUppercase", "[S2S2]"){
-
-
-  std::unique_ptr<minifi::Site2SitePeer> peer = 
std::unique_ptr<minifi::Site2SitePeer>( new 
minifi::Site2SitePeer(std::unique_ptr<DataStream>(new 
DataStream()),"fake_host",65433));
-
-  minifi::Site2SiteClientProtocol protocol(std::move(peer));
-
-
-       std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
-
-       uuid_t fakeUUID;
-
-       uuid_parse(uuid_str.c_str(),fakeUUID);
-
-       protocol.setPortId(fakeUUID);
-
-       REQUIRE( uuid_str != protocol.getPortId() );
-
-       
std::transform(uuid_str.begin(),uuid_str.end(),uuid_str.begin(),::tolower);
-
-       REQUIRE( uuid_str == protocol.getPortId() );
-
-
-
-}
-
-
-TEST_CASE("TestWriteUTF", "[MINIFI193]"){
+TEST_CASE("TestWriteUTF", "[MINIFI193]") {
 
   DataStream baseStream;
 
@@ -86,22 +38,15 @@ TEST_CASE("TestWriteUTF", "[MINIFI193]"){
 
   std::string stringOne = "helo world"; // yes, this has a typo.
   std::string verifyString;
-  ser.writeUTF(stringOne,&baseStream,false);
-
+  ser.writeUTF(stringOne, &baseStream, false);
 
-  ser.readUTF(verifyString,&baseStream,false);
+  ser.readUTF(verifyString, &baseStream, false);
 
   REQUIRE(verifyString == stringOne);
 
-
-
-
 }
 
-
-
-
-TEST_CASE("TestWriteUTF2", "[MINIFI193]"){
+TEST_CASE("TestWriteUTF2", "[MINIFI193]") {
 
   DataStream baseStream;
 
@@ -110,20 +55,15 @@ TEST_CASE("TestWriteUTF2", "[MINIFI193]"){
   std::string stringOne = "hel\xa1o world";
   REQUIRE(11 == stringOne.length());
   std::string verifyString;
-  ser.writeUTF(stringOne,&baseStream,false);
-
+  ser.writeUTF(stringOne, &baseStream, false);
 
-  ser.readUTF(verifyString,&baseStream,false);
+  ser.readUTF(verifyString, &baseStream, false);
 
   REQUIRE(verifyString == stringOne);
 
-
-
-
 }
 
-
-TEST_CASE("TestWriteUTF3", "[MINIFI193]"){
+TEST_CASE("TestWriteUTF3", "[MINIFI193]") {
 
   DataStream baseStream;
 
@@ -132,14 +72,11 @@ TEST_CASE("TestWriteUTF3", "[MINIFI193]"){
   std::string stringOne = "\xe4\xbd\xa0\xe5\xa5\xbd\xe4\xb8\x96\xe7\x95\x8c";
   REQUIRE(12 == stringOne.length());
   std::string verifyString;
-  ser.writeUTF(stringOne,&baseStream,false);
-
+  ser.writeUTF(stringOne, &baseStream, false);
 
-  ser.readUTF(verifyString,&baseStream,false);
+  ser.readUTF(verifyString, &baseStream, false);
 
   REQUIRE(verifyString == stringOne);
 
-
-
-
 }
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/Site2SiteTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/Site2SiteTests.cpp 
b/libminifi/test/unit/Site2SiteTests.cpp
new file mode 100644
index 0000000..a67881d
--- /dev/null
+++ b/libminifi/test/unit/Site2SiteTests.cpp
@@ -0,0 +1,202 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "io/BaseStream.h"
+#include "Site2SitePeer.h"
+#include "Site2SiteClientProtocol.h"
+#include <uuid/uuid.h>
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "SiteToSiteHelper.h"
+#include <algorithm>
+#include <string>
+#include <memory>
+#include "../TestBase.h"
+#define FMT_DEFAULT fmt_lower
+
+using namespace org::apache::nifi::minifi::io;
+TEST_CASE("TestSetPortId", "[S2S1]") {
+
+  std::unique_ptr<minifi::Site2SitePeer> peer =
+      std::unique_ptr < minifi::Site2SitePeer
+          > (new minifi::Site2SitePeer(
+              std::unique_ptr < DataStream > (new DataStream()), "fake_host",
+              65433));
+
+  minifi::Site2SiteClientProtocol protocol(std::move(peer));
+
+  std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538";
+
+  uuid_t fakeUUID;
+
+  uuid_parse(uuid_str.c_str(), fakeUUID);
+
+  protocol.setPortId(fakeUUID);
+
+  REQUIRE(uuid_str == protocol.getPortId());
+
+}
+
+TEST_CASE("TestSetPortIdUppercase", "[S2S2]") {
+
+  std::unique_ptr<minifi::Site2SitePeer> peer =
+      std::unique_ptr < minifi::Site2SitePeer
+          > (new minifi::Site2SitePeer(
+              std::unique_ptr < DataStream > (new DataStream()), "fake_host",
+              65433));
+
+  minifi::Site2SiteClientProtocol protocol(std::move(peer));
+
+  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+
+  uuid_t fakeUUID;
+
+  uuid_parse(uuid_str.c_str(), fakeUUID);
+
+  protocol.setPortId(fakeUUID);
+
+  REQUIRE(uuid_str != protocol.getPortId());
+
+  std::transform(uuid_str.begin(), uuid_str.end(), uuid_str.begin(), 
::tolower);
+
+  REQUIRE(uuid_str == protocol.getPortId());
+
+}
+
+void sunny_path_bootstrap(SiteToSiteResponder *collector) {
+
+  char a = 0x14; // RESOURCE_OK
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), a);
+  collector->push_response(resp_code);
+
+  // Handshake respond code
+  resp_code = "R";
+  collector->push_response(resp_code);
+  resp_code = "C";
+  collector->push_response(resp_code);
+  char b = 0x1;
+  resp_code = b;
+  collector->push_response(resp_code);
+
+  // Codec Negotiation
+  resp_code = a;
+  collector->push_response(resp_code);
+}
+
+TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
+
+  SiteToSiteResponder *collector = new SiteToSiteResponder();
+
+  sunny_path_bootstrap(collector);
+
+  std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
+      < minifi::Site2SitePeer
+      > (new minifi::Site2SitePeer(
+          std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+          65433));
+
+  minifi::Site2SiteClientProtocol protocol(std::move(peer));
+
+  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+
+  uuid_t fakeUUID;
+
+  uuid_parse(uuid_str.c_str(), fakeUUID);
+
+  protocol.setPortId(fakeUUID);
+
+  REQUIRE(true == protocol.bootstrap());
+
+  REQUIRE(collector->get_next_client_response() == "NiFi");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "SocketFlowFileProtocol");
+  collector->get_next_client_response();
+  collector->get_next_client_response();
+  collector->get_next_client_response();
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "nifi://fake_host:65433");
+  collector->get_next_client_response();
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "GZIP");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "false");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "PORT_IDENTIFIER");
+  collector->get_next_client_response();
+  REQUIRE(
+      collector->get_next_client_response()
+          == "c56a4180-65aa-42ec-a945-5fd21dec0538");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == 
"REQUEST_EXPIRATION_MILLIS");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "30000");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec");
+  collector->get_next_client_response(); // codec version
+
+  // start to send the stuff
+  // Create the transaction
+  std::string transactionID;
+  std::string payload = "Test MiNiFi payload";
+  minifi::Transaction *transaction;
+  transaction = protocol.createTransaction(transactionID, minifi::SEND);
+  collector->get_next_client_response();
+  REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
+  std::map < std::string, std::string > attributes;
+  minifi::DataPacket packet(&protocol, transaction, attributes, payload);
+  REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == true);
+  collector->get_next_client_response();
+  collector->get_next_client_response();
+  std::string rx_payload = collector->get_next_client_response();
+  ;
+  REQUIRE(payload == rx_payload);
+
+}
+
+TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
+
+  SiteToSiteResponder *collector = new SiteToSiteResponder();
+
+  char a = 0xFF;
+  std::string resp_code;
+  resp_code.insert(resp_code.begin(), a);
+  collector->push_response(resp_code);
+  collector->push_response(resp_code);
+
+  std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr
+      < minifi::Site2SitePeer
+      > (new minifi::Site2SitePeer(
+          std::unique_ptr < minifi::io::DataStream > (collector), "fake_host",
+          65433));
+
+  minifi::Site2SiteClientProtocol protocol(std::move(peer));
+
+  std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
+
+  uuid_t fakeUUID;
+
+  uuid_parse(uuid_str.c_str(), fakeUUID);
+
+  protocol.setPortId(fakeUUID);
+
+  REQUIRE(false == protocol.bootstrap());
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/41a5e62d/libminifi/test/unit/SiteToSiteHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/SiteToSiteHelper.h 
b/libminifi/test/unit/SiteToSiteHelper.h
new file mode 100755
index 0000000..36a7000
--- /dev/null
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_
+#define LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_
+
+#include <queue>
+#include "io/BaseStream.h"
+#include "io/EndianCheck.h"
+#include "core/Core.h"
+/**
+ * Test repository
+ */
+class SiteToSiteResponder: public minifi::io::BaseStream {
+private:
+  std::queue<std::string> server_responses_;
+  std::queue<std::string> client_responses_;
+public:
+  SiteToSiteResponder() {
+  }
+  // initialize
+  virtual short initialize() {
+    return 1;
+  }
+
+  void push_response(std::string resp) {
+    server_responses_.push(resp);
+  }
+
+  std::string get_next_response() {
+    std::string ret = server_responses_.front();
+    server_responses_.pop();
+    return ret;
+  }
+
+  int writeData(uint8_t *value, int size) {
+    client_responses_.push(std::string((char*) value, size));
+    return size;
+  }
+
+  bool has_next_client_response() {
+    return !client_responses_.empty();
+  }
+
+  std::string get_next_client_response() {
+    std::string ret = client_responses_.front();
+    client_responses_.pop();
+    return ret;
+  }
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t &value) {
+    value = get_next_response().c_str()[0];
+    return 1;
+  }
+
+  /**
+   * reads two bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint16_t &base_value, bool is_little_endian =
+      minifi::io::EndiannessCheck::IS_LITTLE) {
+    base_value = std::stoi(get_next_response());
+    return 2;
+  }
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(char &value) {
+    value = get_next_response().c_str()[0];
+    return 1;
+  }
+
+  /**
+   * reads a byte array from the stream
+   * @param value reference in which will set the result
+   * @param len length to read
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t *value, int len) {
+    std::string str = get_next_response();
+    memcpy(value, str.c_str(), str.size());
+    return len;
+  }
+
+  virtual int readData(uint8_t *buf, int buflen) {
+    std::string str = get_next_response();
+    memset(buf, 0x00, buflen);
+    memcpy(buf, str.c_str(), str.size());
+    return str.size();
+  }
+
+  /**
+   * reads four bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint32_t &value, bool is_little_endian =
+      minifi::io::EndiannessCheck::IS_LITTLE) {
+    value = std::stoul(get_next_response());
+    return 4;
+  }
+
+  /**
+   * reads eight byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint64_t &value, bool is_little_endian =
+      minifi::io::EndiannessCheck::IS_LITTLE) {
+    value = std::stoull(get_next_response());
+    return 4;
+  }
+
+  /**
+   * read UTF from stream
+   * @param str reference string
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int readUTF(std::string &str, bool widen = false) {
+    str = get_next_response();
+    return str.length();
+  }
+
+};
+
+#endif /* LIBMINIFI_TEST_UNIT_SITE2SITE_HELPER_H_ */

Reply via email to