[ https://issues.apache.org/jira/browse/MINIFICPP-558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686758#comment-16686758 ]
ASF GitHub Bot commented on MINIFICPP-558: ------------------------------------------ Github user arpadboda commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/437#discussion_r233504886 --- Diff: extensions/coap/protocols/CoapC2Protocol.cpp --- @@ -0,0 +1,353 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "CoapC2Protocol.h" +#include "c2/PayloadSerializer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +#include "coap_functions.h" +#include "io/BaseStream.h" + +CoapProtocol::CoapProtocol(std::string name, utils::Identifier uuid) + : RESTSender(name, uuid), + require_registration_(false), + logger_(logging::LoggerFactory<CoapProtocol>::getLogger()) { +} + +CoapProtocol::~CoapProtocol() { +} + +void CoapProtocol::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + RESTSender::initialize(controller, configure); + if (configure->get("nifi.c2.coap.connector.service", controller_service_name_)) { + auto service = controller->getControllerService(controller_service_name_); + coap_service_ = std::static_pointer_cast<controllers::CoapConnectorService>(service); + } else { + logger_->log_info("No CoAP connector configured, so using default service"); + coap_service_ = std::make_shared<controllers::CoapConnectorService>("cs", configure); + coap_service_->onEnable(); + } +} + +C2Payload CoapProtocol::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { + return RESTSender::consumePayload(url, payload, direction, false); +} + +int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const C2Payload &payload) { + auto ident = payload.getIdentifier(); + auto state = payload.getStatus().getState(); + stream->writeUTF(ident); + uint8_t payloadState = 0; + switch (state) { + case state::UpdateState::NESTED: + case state::UpdateState::INITIATE: + case state::UpdateState::FULLY_APPLIED: + case state::UpdateState::READ_COMPLETE: + payloadState = 0; + break; + case state::UpdateState::NOT_APPLIED: + case state::UpdateState::PARTIALLY_APPLIED: + payloadState = 1; + break; + case state::UpdateState::READ_ERROR: + payloadState = 2; + break; + case state::UpdateState::SET_ERROR: + payloadState = 3; + break; + } + stream->write(&payloadState, 1); + return 0; +} + +int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const C2Payload &payload) { + bool byte; + uint16_t size = 0; + + std::string deviceIdent; + // device identifier + auto deviceInfo = getPayload("deviceInfo", payload); + if (deviceInfo) { + for (const auto &component : deviceInfo->getContent()) { + if (!getString(&component, "identifier", &deviceIdent)) { + break; + } + } + } + stream->writeUTF(deviceIdent, false); + std::string agentIdent; + // agent identifier + auto agentInfo = getPayload("agentInfo", payload); + if (agentInfo) { + for (const auto &component : agentInfo->getContent()) { + if (!getString(&component, "identifier", &agentIdent)) { + break; + } + } + } + if (agentIdent.empty()) { + return -1; + } + stream->writeUTF(agentIdent, false); + + auto flowInfo = getPayload("flowInfo", payload); + + if (flowInfo != nullptr) { + + auto components = getPayload("components", flowInfo); + + auto queues = getPayload("queues", flowInfo); + + auto versionedFlowSnapshotURI = getPayload("versionedFlowSnapshotURI", flowInfo); + + if (components && queues && versionedFlowSnapshotURI) { + byte = true; + stream->write(byte); + size = components->getNestedPayloads().size(); + stream->write(size); + // write statuses + for (const auto &component : components->getNestedPayloads()) { + stream->writeUTF(component.getLabel(), false); + for (const auto &cmp : component.getContent()) { + auto exists = cmp.operation_arguments.find("running"); + byte = 0x00; + if (exists != cmp.operation_arguments.end()) { + const auto &node = exists->second.getValue(); + if (auto sub_type = std::dynamic_pointer_cast<state::response::BoolValue>(node)) { + byte = sub_type->getValue(); + } + } + stream->write(byte); + } + } + + size = queues->getNestedPayloads().size(); + stream->write(size); + // write statuses + for (const auto &component : queues->getNestedPayloads()) { + stream->writeUTF(component.getLabel(), false); + for (const auto &cmp : component.getContent()) { + uint64_t datasize = 0, datasizemax = 0, size = 0, sizemax = 0; + getLong(&cmp, "dataSize", &datasize); + getLong(&cmp, "dataSizeMax", &datasizemax); + getLong(&cmp, "size", &size); + getLong(&cmp, "sizeMax", &sizemax); + stream->write(datasize); + stream->write(datasizemax); + stream->write(size); + stream->write(sizemax); + } + } + std::string bucketId = "default", flowid = ""; + for (const auto &cmp : versionedFlowSnapshotURI->getContent()) { + auto bid = cmp.operation_arguments.find("bucketId"); + if (bid != cmp.operation_arguments.end()) { + bucketId = bid->second.to_string(); + } + + auto flowId = cmp.operation_arguments.find("flowId"); + if (flowId != cmp.operation_arguments.end()) { + flowid = flowId->second.to_string(); + } + } + stream->writeUTF(bucketId); + stream->writeUTF(flowid); + + } else { + byte = false; + stream->write(byte); + } + + } else { + byte = false; + stream->write(byte); + } + return 0; +} + +Operation CoapProtocol::getOperation(int type) { + switch (type) { + case 0: + return ACKNOWLEDGE; + case 1: + return HEARTBEAT; + case 2: + return CLEAR; + case 3: + return DESCRIBE; + case 4: + return RESTART; + case 5: + return START; + case 6: + return UPDATE; + case 7: + return STOP; + } + return ACKNOWLEDGE; +} + +C2Payload CoapProtocol::serialize(const C2Payload &payload) { + if (nullptr == coap_service_) { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } + + if (require_registration_) { + logger_->log_debug("Server requested agent registration, so attempting"); + auto response = RESTSender::consumePayload(rest_uri_, payload, TRANSMIT, false); + if (response.getStatus().getState() == state::UpdateState::READ_ERROR) { + logger_->log_trace("Could not register"); + return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); + } else { + logger_->log_trace("Registered agent."); + } + require_registration_ = false; + + return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); + + } + + uint16_t version = 0; + uint8_t payload_type = 0; + uint64_t payload_u64 = 0; + uint16_t size = 0; + io::BaseStream stream; + + stream.write(version); + std::string endpoint = "heartbeat"; + switch (payload.getOperation()) { + case ACKNOWLEDGE: + endpoint = "acknowledge"; + payload_type = 0; + stream.write(&payload_type, 1); + if (writeAcknowledgement(&stream, payload) != 0) { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } + break; + case HEARTBEAT: + payload_type = 1; + stream.write(&payload_type, 1); + if (writeHeartbeat(&stream, payload) != 0) { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } + break; + }; + + size_t bsize = stream.getSize(); + auto buffer = (unsigned char*) stream.getBuffer(); + + controllers::CoapConnectorService::CoAPMessage message(coap_service_->sendPayload(COAP_REQUEST_POST, endpoint, buffer, bsize)); + + if (message.isRegistrationRequest()) { + require_registration_ = true; + } else if (message.getSize() > 0) { + io::DataStream byteStream(message.getData(), message.getSize()); + io::BaseStream responseStream(&byteStream); + responseStream.read(version); + responseStream.read(size); + C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + for (int i = 0; i < size; i++) { + + uint8_t operationType; + uint16_t argsize = 0; + std::string operand, id; + responseStream.read(operationType); + responseStream.readUTF(id, false); + responseStream.readUTF(operand, false); + + auto newOp = getOperation(operationType); + C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); + nested_payload.setIdentifier(id); + C2ContentResponse new_command(newOp); + new_command.delay = 0; + new_command.required = true; + new_command.ttl = -1; + new_command.name = operand; + new_command.ident = id; + responseStream.read(argsize); + for (int j = 0; j < argsize; j++) { + std::string key, value; + responseStream.readUTF(key); + responseStream.readUTF(value); + new_command.operation_arguments[key] = value; + } + + nested_payload.addContent(std::move(new_command)); + new_payload.addPayload(std::move(nested_payload)); + } + return new_payload; + } + + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); +} + +const C2Payload * const CoapProtocol::getPayload(const std::string &name, const C2Payload * const payload) { + if (payload->getLabel() == name) { + return payload; + } + for (const auto &nested_payload : payload->getNestedPayloads()) { + if (nested_payload.getLabel() == name) { + return &nested_payload; + } + } + return nullptr; +} + +int8_t CoapProtocol::getLong(const C2ContentResponse * const cmp, const std::string &name, uint64_t *value) { + auto exists = cmp->operation_arguments.find(name); + if (exists != cmp->operation_arguments.end()) { + const auto &node = exists->second.getValue(); + if (auto sub_type = std::dynamic_pointer_cast<state::response::Int64Value>(node)) { + *value = sub_type->getValue(); + return 0; + } + } + return -1; +} + +int8_t CoapProtocol::getString(const C2ContentResponse * const cmp, const std::string &name, std::string *value) { --- End diff -- As it's C++, why value is a pointer (ref would be fine)? Moreover I don't see why to have 0 or -1 return codes, empty string would be fine as well to indicate the lack of existence. > Move PayloadSerializer in preparation for Coap > ---------------------------------------------- > > Key: MINIFICPP-558 > URL: https://issues.apache.org/jira/browse/MINIFICPP-558 > Project: NiFi MiNiFi C++ > Issue Type: Bug > Reporter: Mr TheSegfault > Assignee: Mr TheSegfault > Priority: Major > Fix For: 0.6.0 > > > Move PayloadSerializer -- This message was sent by Atlassian JIRA (v7.6.3#76005)