[ 
https://issues.apache.org/jira/browse/MINIFICPP-558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686758#comment-16686758
 ] 

ASF GitHub Bot commented on MINIFICPP-558:
------------------------------------------

Github user arpadboda commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/437#discussion_r233504886
  
    --- Diff: extensions/coap/protocols/CoapC2Protocol.cpp ---
    @@ -0,0 +1,353 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "CoapC2Protocol.h"
    +#include "c2/PayloadSerializer.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace c2 {
    +
    +#include "coap_functions.h"
    +#include "io/BaseStream.h"
    +
    +CoapProtocol::CoapProtocol(std::string name, utils::Identifier uuid)
    +    : RESTSender(name, uuid),
    +      require_registration_(false),
    +      logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) {
    +}
    +
    +CoapProtocol::~CoapProtocol() {
    +}
    +
    +void CoapProtocol::initialize(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<Configure> &configure) {
    +  RESTSender::initialize(controller, configure);
    +  if (configure->get("nifi.c2.coap.connector.service", 
controller_service_name_)) {
    +    auto service = 
controller->getControllerService(controller_service_name_);
    +    coap_service_ = 
std::static_pointer_cast<controllers::CoapConnectorService>(service);
    +  } else {
    +    logger_->log_info("No CoAP connector configured, so using default 
service");
    +    coap_service_ = 
std::make_shared<controllers::CoapConnectorService>("cs", configure);
    +    coap_service_->onEnable();
    +  }
    +}
    +
    +C2Payload CoapProtocol::consumePayload(const std::string &url, const 
C2Payload &payload, Direction direction, bool async) {
    +  return RESTSender::consumePayload(url, payload, direction, false);
    +}
    +
    +int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const 
C2Payload &payload) {
    +  auto ident = payload.getIdentifier();
    +  auto state = payload.getStatus().getState();
    +  stream->writeUTF(ident);
    +  uint8_t payloadState = 0;
    +  switch (state) {
    +    case state::UpdateState::NESTED:
    +    case state::UpdateState::INITIATE:
    +    case state::UpdateState::FULLY_APPLIED:
    +    case state::UpdateState::READ_COMPLETE:
    +      payloadState = 0;
    +      break;
    +    case state::UpdateState::NOT_APPLIED:
    +    case state::UpdateState::PARTIALLY_APPLIED:
    +      payloadState = 1;
    +      break;
    +    case state::UpdateState::READ_ERROR:
    +      payloadState = 2;
    +      break;
    +    case state::UpdateState::SET_ERROR:
    +      payloadState = 3;
    +      break;
    +  }
    +  stream->write(&payloadState, 1);
    +  return 0;
    +}
    +
    +int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const C2Payload 
&payload) {
    +  bool byte;
    +  uint16_t size = 0;
    +
    +  std::string deviceIdent;
    +  // device identifier
    +  auto deviceInfo = getPayload("deviceInfo", payload);
    +  if (deviceInfo) {
    +    for (const auto &component : deviceInfo->getContent()) {
    +      if (!getString(&component, "identifier", &deviceIdent)) {
    +        break;
    +      }
    +    }
    +  }
    +  stream->writeUTF(deviceIdent, false);
    +  std::string agentIdent;
    +  // agent identifier
    +  auto agentInfo = getPayload("agentInfo", payload);
    +  if (agentInfo) {
    +    for (const auto &component : agentInfo->getContent()) {
    +      if (!getString(&component, "identifier", &agentIdent)) {
    +        break;
    +      }
    +    }
    +  }
    +  if (agentIdent.empty()) {
    +    return -1;
    +  }
    +  stream->writeUTF(agentIdent, false);
    +
    +  auto flowInfo = getPayload("flowInfo", payload);
    +
    +  if (flowInfo != nullptr) {
    +
    +    auto components = getPayload("components", flowInfo);
    +
    +    auto queues = getPayload("queues", flowInfo);
    +
    +    auto versionedFlowSnapshotURI = getPayload("versionedFlowSnapshotURI", 
flowInfo);
    +
    +    if (components && queues && versionedFlowSnapshotURI) {
    +      byte = true;
    +      stream->write(byte);
    +      size = components->getNestedPayloads().size();
    +      stream->write(size);
    +      // write statuses
    +      for (const auto &component : components->getNestedPayloads()) {
    +        stream->writeUTF(component.getLabel(), false);
    +        for (const auto &cmp : component.getContent()) {
    +          auto exists = cmp.operation_arguments.find("running");
    +          byte = 0x00;
    +          if (exists != cmp.operation_arguments.end()) {
    +            const auto &node = exists->second.getValue();
    +            if (auto sub_type = 
std::dynamic_pointer_cast<state::response::BoolValue>(node)) {
    +              byte = sub_type->getValue();
    +            }
    +          }
    +          stream->write(byte);
    +        }
    +      }
    +
    +      size = queues->getNestedPayloads().size();
    +      stream->write(size);
    +      // write statuses
    +      for (const auto &component : queues->getNestedPayloads()) {
    +        stream->writeUTF(component.getLabel(), false);
    +        for (const auto &cmp : component.getContent()) {
    +          uint64_t datasize = 0, datasizemax = 0, size = 0, sizemax = 0;
    +          getLong(&cmp, "dataSize", &datasize);
    +          getLong(&cmp, "dataSizeMax", &datasizemax);
    +          getLong(&cmp, "size", &size);
    +          getLong(&cmp, "sizeMax", &sizemax);
    +          stream->write(datasize);
    +          stream->write(datasizemax);
    +          stream->write(size);
    +          stream->write(sizemax);
    +        }
    +      }
    +      std::string bucketId = "default", flowid = "";
    +      for (const auto &cmp : versionedFlowSnapshotURI->getContent()) {
    +        auto bid = cmp.operation_arguments.find("bucketId");
    +        if (bid != cmp.operation_arguments.end()) {
    +          bucketId = bid->second.to_string();
    +        }
    +
    +        auto flowId = cmp.operation_arguments.find("flowId");
    +        if (flowId != cmp.operation_arguments.end()) {
    +          flowid = flowId->second.to_string();
    +        }
    +      }
    +      stream->writeUTF(bucketId);
    +      stream->writeUTF(flowid);
    +
    +    } else {
    +      byte = false;
    +      stream->write(byte);
    +    }
    +
    +  } else {
    +    byte = false;
    +    stream->write(byte);
    +  }
    +  return 0;
    +}
    +
    +Operation CoapProtocol::getOperation(int type) {
    +  switch (type) {
    +    case 0:
    +      return ACKNOWLEDGE;
    +    case 1:
    +      return HEARTBEAT;
    +    case 2:
    +      return CLEAR;
    +    case 3:
    +      return DESCRIBE;
    +    case 4:
    +      return RESTART;
    +    case 5:
    +      return START;
    +    case 6:
    +      return UPDATE;
    +    case 7:
    +      return STOP;
    +  }
    +  return ACKNOWLEDGE;
    +}
    +
    +C2Payload CoapProtocol::serialize(const C2Payload &payload) {
    +  if (nullptr == coap_service_) {
    +    return C2Payload(payload.getOperation(), 
state::UpdateState::READ_ERROR, true);
    +  }
    +
    +  if (require_registration_) {
    +    logger_->log_debug("Server requested agent registration, so 
attempting");
    +    auto response = RESTSender::consumePayload(rest_uri_, payload, 
TRANSMIT, false);
    +    if (response.getStatus().getState() == state::UpdateState::READ_ERROR) 
{
    +      logger_->log_trace("Could not register");
    +      return C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true);
    +    } else {
    +      logger_->log_trace("Registered agent.");
    +    }
    +    require_registration_ = false;
    +
    +    return C2Payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true);
    +
    +  }
    +
    +  uint16_t version = 0;
    +  uint8_t payload_type = 0;
    +  uint64_t payload_u64 = 0;
    +  uint16_t size = 0;
    +  io::BaseStream stream;
    +
    +  stream.write(version);
    +  std::string endpoint = "heartbeat";
    +  switch (payload.getOperation()) {
    +    case ACKNOWLEDGE:
    +      endpoint = "acknowledge";
    +      payload_type = 0;
    +      stream.write(&payload_type, 1);
    +      if (writeAcknowledgement(&stream, payload) != 0) {
    +        return C2Payload(payload.getOperation(), 
state::UpdateState::READ_ERROR, true);
    +      }
    +      break;
    +    case HEARTBEAT:
    +      payload_type = 1;
    +      stream.write(&payload_type, 1);
    +      if (writeHeartbeat(&stream, payload) != 0) {
    +        return C2Payload(payload.getOperation(), 
state::UpdateState::READ_ERROR, true);
    +      }
    +      break;
    +  };
    +
    +  size_t bsize = stream.getSize();
    +  auto buffer = (unsigned char*) stream.getBuffer();
    +
    +  controllers::CoapConnectorService::CoAPMessage 
message(coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, buffer, bsize));
    +
    +  if (message.isRegistrationRequest()) {
    +    require_registration_ = true;
    +  } else if (message.getSize() > 0) {
    +    io::DataStream byteStream(message.getData(), message.getSize());
    +    io::BaseStream responseStream(&byteStream);
    +    responseStream.read(version);
    +    responseStream.read(size);
    +    C2Payload new_payload(payload.getOperation(), 
state::UpdateState::NESTED, true);
    +    for (int i = 0; i < size; i++) {
    +
    +      uint8_t operationType;
    +      uint16_t argsize = 0;
    +      std::string operand, id;
    +      responseStream.read(operationType);
    +      responseStream.readUTF(id, false);
    +      responseStream.readUTF(operand, false);
    +
    +      auto newOp = getOperation(operationType);
    +      C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, 
true);
    +      nested_payload.setIdentifier(id);
    +      C2ContentResponse new_command(newOp);
    +      new_command.delay = 0;
    +      new_command.required = true;
    +      new_command.ttl = -1;
    +      new_command.name = operand;
    +      new_command.ident = id;
    +      responseStream.read(argsize);
    +      for (int j = 0; j < argsize; j++) {
    +        std::string key, value;
    +        responseStream.readUTF(key);
    +        responseStream.readUTF(value);
    +        new_command.operation_arguments[key] = value;
    +      }
    +
    +      nested_payload.addContent(std::move(new_command));
    +      new_payload.addPayload(std::move(nested_payload));
    +    }
    +    return new_payload;
    +  }
    +
    +  return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, 
true);
    +}
    +
    +const C2Payload * const CoapProtocol::getPayload(const std::string &name, 
const C2Payload * const payload) {
    +  if (payload->getLabel() == name) {
    +    return payload;
    +  }
    +  for (const auto &nested_payload : payload->getNestedPayloads()) {
    +    if (nested_payload.getLabel() == name) {
    +      return &nested_payload;
    +    }
    +  }
    +  return nullptr;
    +}
    +
    +int8_t CoapProtocol::getLong(const C2ContentResponse * const cmp, const 
std::string &name, uint64_t *value) {
    +  auto exists = cmp->operation_arguments.find(name);
    +  if (exists != cmp->operation_arguments.end()) {
    +    const auto &node = exists->second.getValue();
    +    if (auto sub_type = 
std::dynamic_pointer_cast<state::response::Int64Value>(node)) {
    +      *value = sub_type->getValue();
    +      return 0;
    +    }
    +  }
    +  return -1;
    +}
    +
    +int8_t CoapProtocol::getString(const C2ContentResponse * const cmp, const 
std::string &name, std::string *value) {
    --- End diff --
    
    As it's C++, why value is a pointer (ref would be fine)?
    Moreover I don't see why to have 0 or -1 return codes, empty string would 
be fine as well to indicate the lack of existence. 


> Move PayloadSerializer in preparation for Coap
> ----------------------------------------------
>
>                 Key: MINIFICPP-558
>                 URL: https://issues.apache.org/jira/browse/MINIFICPP-558
>             Project: NiFi MiNiFi C++
>          Issue Type: Bug
>            Reporter: Mr TheSegfault
>            Assignee: Mr TheSegfault
>            Priority: Major
>             Fix For: 0.6.0
>
>
> Move PayloadSerializer



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to