[GitHub] nifi pull request #2375: autoclosesocket
GitHub user guangxuewu opened a pull request: https://github.com/apache/nifi/pull/2375 autoclosesocket Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guangxuewu/nifi master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2375.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2375 commit 7c9456d25b273bd8586a9352772801f5d9b462be Author: guangxuewuDate: 2018-01-05T05:46:55Z autoclosesocket ---
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312453#comment-16312453 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097 --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt --- @@ -0,0 +1,86 @@ +#*** +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +#http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Genis Riera Perez - Add support for building debian package +#***/ + +# Note: on OS X you should install XCode and the associated command-line tools + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) +PROJECT("paho" C) +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) + +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake") +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +## build settings +SET(PAHO_VERSION_MAJOR 1) +SET(PAHO_VERSION_MINOR 2) +SET(PAHO_VERSION_PATCH 0) +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) + +INCLUDE(GNUInstallDirs) + +STRING(TIMESTAMP BUILD_TIMESTAMP UTC) +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}") + +IF(WIN32) + ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD) +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + ADD_DEFINITIONS(-DOSX) +ENDIF() + +## build options +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ") +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library") +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)") +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs") +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package") +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run") + +ADD_SUBDIRECTORY(src) +IF(PAHO_BUILD_SAMPLES) +ADD_SUBDIRECTORY(src/samples) +ENDIF() + +IF(PAHO_BUILD_DOCUMENTATION) +ADD_SUBDIRECTORY(doc) +ENDIF() + +### packaging settings +IF (WIN32) +SET(CPACK_GENERATOR "ZIP") +ELSEIF(PAHO_BUILD_DEB_PACKAGE) +SET(CPACK_GENERATOR "DEB") +CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in +${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY) +SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake) +ADD_SUBDIRECTORY(debian) +ELSE() +SET(CPACK_GENERATOR "TGZ") +ENDIF() + +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR}) +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR}) +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH}) +INCLUDE(CPack) + +IF(PAHO_ENABLE_TESTING) --- End diff -- will remove > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312451#comment-16312451 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +retain_ = false; +max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string , MQTTClient client, +int qos, bool retain, MQTTClient_deliveryToken ) : +flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), +qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + if (flow_size_ < max_seg_size_) +max_seg_size_ = flow_size_; + std::vector buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { +int readRet = stream->read([0], max_seg_size_); +if (readRet < 0) { + status_ = -1; + return read_size_; +} +if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = [0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), , _) != MQTTCLIENT_SUCCESS) { --- End diff -- it add the MQTT header and call socket write. the deliverable callback is for QOS. /** * This is a callback function. The client application * must provide an implementation of this function to enable asynchronous * notification of delivery of messages. The function is registered with the * client library by passing it as an argument to MQTTClient_setCallbacks(). * It is called by the client library after the client application has * published a message to the server. It indicates that the necessary * handshaking and acknowledgements for the requested quality of service (see * MQTTClient_message.qos) have been completed. This function is executed on a * separate thread to the one on which the client application is running. * Note:MQTTClient_deliveryComplete() is not called when messages are * published at QoS0. * @param context A pointer to the context value originally
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312452#comment-16312452 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086 --- Diff: thirdparty/paho.mqtt.c/.travis.yml --- @@ -0,0 +1,47 @@ +sudo: true --- End diff -- will remove > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +retain_ = false; +max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string , MQTTClient client, +int qos, bool retain, MQTTClient_deliveryToken ) : +flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), +qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + if (flow_size_ < max_seg_size_) +max_seg_size_ = flow_size_; + std::vector buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { +int readRet = stream->read([0], max_seg_size_); +if (readRet < 0) { + status_ = -1; + return read_size_; +} +if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = [0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), , _) != MQTTCLIENT_SUCCESS) { --- End diff -- it add the MQTT header and call socket write. the deliverable callback is for QOS. /** * This is a callback function. The client application * must provide an implementation of this function to enable asynchronous * notification of delivery of messages. The function is registered with the * client library by passing it as an argument to MQTTClient_setCallbacks(). * It is called by the client library after the client application has * published a message to the server. It indicates that the necessary * handshaking and acknowledgements for the requested quality of service (see * MQTTClient_message.qos) have been completed. This function is executed on a * separate thread to the one on which the client application is running. * Note:MQTTClient_deliveryComplete() is not called when messages are * published at QoS0. * @param context A pointer to the context value originally passed to * MQTTClient_setCallbacks(), which contains any application-specific context. * @param dt The ::MQTTClient_deliveryToken associated with * the published message. Applications can check that all messages have been *
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097 --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt --- @@ -0,0 +1,86 @@ +#*** +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +#http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Genis Riera Perez - Add support for building debian package +#***/ + +# Note: on OS X you should install XCode and the associated command-line tools + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) +PROJECT("paho" C) +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) + +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake") +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +## build settings +SET(PAHO_VERSION_MAJOR 1) +SET(PAHO_VERSION_MINOR 2) +SET(PAHO_VERSION_PATCH 0) +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) + +INCLUDE(GNUInstallDirs) + +STRING(TIMESTAMP BUILD_TIMESTAMP UTC) +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}") + +IF(WIN32) + ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD) +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + ADD_DEFINITIONS(-DOSX) +ENDIF() + +## build options +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ") +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library") +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)") +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs") +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package") +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run") + +ADD_SUBDIRECTORY(src) +IF(PAHO_BUILD_SAMPLES) +ADD_SUBDIRECTORY(src/samples) +ENDIF() + +IF(PAHO_BUILD_DOCUMENTATION) +ADD_SUBDIRECTORY(doc) +ENDIF() + +### packaging settings +IF (WIN32) +SET(CPACK_GENERATOR "ZIP") +ELSEIF(PAHO_BUILD_DEB_PACKAGE) +SET(CPACK_GENERATOR "DEB") +CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in +${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY) +SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake) +ADD_SUBDIRECTORY(debian) +ELSE() +SET(CPACK_GENERATOR "TGZ") +ENDIF() + +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR}) +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR}) +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH}) +INCLUDE(CPack) + +IF(PAHO_ENABLE_TESTING) --- End diff -- will remove ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086 --- Diff: thirdparty/paho.mqtt.c/.travis.yml --- @@ -0,0 +1,47 @@ +sudo: true --- End diff -- will remove ---
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312444#comment-16312444 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405 --- Diff: extensions/mqtt/ConsumeMQTT.h --- @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +isSubscriber_ = true; +maxQueueSize_ = 100; + } + // Destructor + virtual ~ConsumeMQTT() { +std::lock_guard < std::mutex > lock(mutex_); +while (!queue_.empty()) { + MQTTClient_message *message = queue_.front(); + MQTTClient_freeMessage(); + queue_.pop_front(); +} + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxQueueSize; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; +} +MQTTClient_message *message_; +int64_t process(std::shared_ptr stream) { + int64_t len = stream->write(reinterpret_cast(message_->payload), message_->payloadlen); + if (len < 0) +status_ = -1; + return len; +} +int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr , const std::shared_ptr ); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque _queue) { +std::lock_guard < std::mutex > lock(mutex_); --- End diff -- will do > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312443#comment-16312443 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371 --- Diff: extensions/mqtt/AbstractMQTTProcessor.h --- @@ -0,0 +1,154 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +client_ = nullptr; +cleanSession_ = false; +keepAliveInterval_ = 60; +connectionTimeOut_ = 30; +qos_ = 0; +isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { +if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); --- End diff -- it should be. only side effect is if app halt and did not unsub. The broker will buffer the msgs to the topic while the client is go away. if the app restart with the same topic and same client ID, it will rx all msg buffer for that client buffered in the broker. > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405 --- Diff: extensions/mqtt/ConsumeMQTT.h --- @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +isSubscriber_ = true; +maxQueueSize_ = 100; + } + // Destructor + virtual ~ConsumeMQTT() { +std::lock_guard < std::mutex > lock(mutex_); +while (!queue_.empty()) { + MQTTClient_message *message = queue_.front(); + MQTTClient_freeMessage(); + queue_.pop_front(); +} + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxQueueSize; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; +} +MQTTClient_message *message_; +int64_t process(std::shared_ptr stream) { + int64_t len = stream->write(reinterpret_cast(message_->payload), message_->payloadlen); + if (len < 0) +status_ = -1; + return len; +} +int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr , const std::shared_ptr ); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque _queue) { +std::lock_guard < std::mutex > lock(mutex_); --- End diff -- will do ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371 --- Diff: extensions/mqtt/AbstractMQTTProcessor.h --- @@ -0,0 +1,154 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +client_ = nullptr; +cleanSession_ = false; +keepAliveInterval_ = 60; +connectionTimeOut_ = 30; +qos_ = 0; +isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { +if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); --- End diff -- it should be. only side effect is if app halt and did not unsub. The broker will buffer the msgs to the topic while the client is go away. if the app restart with the same topic and same client ID, it will rx all msg buffer for that client buffered in the broker. ---
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312438#comment-16312438 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) &&
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312427#comment-16312427 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312426#comment-16312426 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) &&
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) &&
[jira] [Updated] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-355: - Resolution: Fixed Status: Resolved (was: Patch Available) Closed, pending additional issues found by the reporter. > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt >Assignee: marco polo > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo reassigned MINIFICPP-355: Assignee: marco polo > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt >Assignee: marco polo > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] marco polo updated MINIFICPP-355: - Status: Patch Available (was: Open) > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312326#comment-16312326 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312320#comment-16312320 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508 --- Diff: extensions/mqtt/AbstractMQTTProcessor.h --- @@ -0,0 +1,154 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +client_ = nullptr; +cleanSession_ = false; +keepAliveInterval_ = 60; +connectionTimeOut_ = 30; +qos_ = 0; +isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { +if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); --- End diff -- what happens if unsubscribe is not called due to failure? Is that eventually okay? > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312318#comment-16312318 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312322#comment-16312322 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312319#comment-16312319 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312324#comment-16312324 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541 --- Diff: thirdparty/paho.mqtt.c/.travis.yml --- @@ -0,0 +1,47 @@ +sudo: true --- End diff -- this file doesn't seem like it's needed. > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312321#comment-16312321 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693 --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt --- @@ -0,0 +1,86 @@ +#*** +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +#http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Genis Riera Perez - Add support for building debian package +#***/ + +# Note: on OS X you should install XCode and the associated command-line tools + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) +PROJECT("paho" C) +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) + +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake") +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +## build settings +SET(PAHO_VERSION_MAJOR 1) +SET(PAHO_VERSION_MINOR 2) +SET(PAHO_VERSION_PATCH 0) +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) + +INCLUDE(GNUInstallDirs) + +STRING(TIMESTAMP BUILD_TIMESTAMP UTC) +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}") + +IF(WIN32) + ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD) +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + ADD_DEFINITIONS(-DOSX) +ENDIF() + +## build options +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ") +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library") +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)") +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs") +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package") +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run") + +ADD_SUBDIRECTORY(src) +IF(PAHO_BUILD_SAMPLES) +ADD_SUBDIRECTORY(src/samples) +ENDIF() + +IF(PAHO_BUILD_DOCUMENTATION) +ADD_SUBDIRECTORY(doc) +ENDIF() + +### packaging settings +IF (WIN32) +SET(CPACK_GENERATOR "ZIP") +ELSEIF(PAHO_BUILD_DEB_PACKAGE) +SET(CPACK_GENERATOR "DEB") +CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in +${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY) +SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake) +ADD_SUBDIRECTORY(debian) +ELSE() +SET(CPACK_GENERATOR "TGZ") +ENDIF() + +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR}) +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR}) +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH}) +INCLUDE(CPack) + +IF(PAHO_ENABLE_TESTING) --- End diff -- probably don't need tests, right? > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312325#comment-16312325 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068 --- Diff: extensions/mqtt/ConsumeMQTT.h --- @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +isSubscriber_ = true; +maxQueueSize_ = 100; + } + // Destructor + virtual ~ConsumeMQTT() { +std::lock_guard < std::mutex > lock(mutex_); +while (!queue_.empty()) { + MQTTClient_message *message = queue_.front(); + MQTTClient_freeMessage(); + queue_.pop_front(); +} + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxQueueSize; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; +} +MQTTClient_message *message_; +int64_t process(std::shared_ptr stream) { + int64_t len = stream->write(reinterpret_cast(message_->payload), message_->payloadlen); + if (len < 0) +status_ = -1; + return len; +} +int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr , const std::shared_ptr ); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque _queue) { +std::lock_guard < std::mutex > lock(mutex_); --- End diff -- Could you use a lock free queue here? might not save much but may increase throughput and we have one in our code base already. > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (MINIFICPP-342) MQTT framework
[ https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312323#comment-16312323 ] ASF GitHub Bot commented on MINIFICPP-342: -- Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +retain_ = false; +max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string , MQTTClient client, +int qos, bool retain, MQTTClient_deliveryToken ) : +flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), +qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + if (flow_size_ < max_seg_size_) +max_seg_size_ = flow_size_; + std::vector buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { +int readRet = stream->read([0], max_seg_size_); +if (readRet < 0) { + status_ = -1; + return read_size_; +} +if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = [0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), , _) != MQTTCLIENT_SUCCESS) { --- End diff -- does the publish copy the buffer? If not, does it finish entirely or will it create a callback ? I ask because you're passing in the buffer as the payload, but if there is a callback, we could possibly have memory that's freed when this function exits with the MQTTClient still in progress. > MQTT framework > -- > > Key: MINIFICPP-342 > URL: https://issues.apache.org/jira/browse/MINIFICPP-342 > Project: NiFi MiNiFi C++ > Issue Type: New Feature >Affects Versions: 0.3.0 >Reporter: bqiu >Assignee: bqiu > Fix For: 0.3.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068 --- Diff: extensions/mqtt/ConsumeMQTT.h --- @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +isSubscriber_ = true; +maxQueueSize_ = 100; + } + // Destructor + virtual ~ConsumeMQTT() { +std::lock_guard < std::mutex > lock(mutex_); +while (!queue_.empty()) { + MQTTClient_message *message = queue_.front(); + MQTTClient_freeMessage(); + queue_.pop_front(); +} + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxQueueSize; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; +} +MQTTClient_message *message_; +int64_t process(std::shared_ptr stream) { + int64_t len = stream->write(reinterpret_cast(message_->payload), message_->payloadlen); + if (len < 0) +status_ = -1; + return len; +} +int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr , const std::shared_ptr ); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque _queue) { +std::lock_guard < std::mutex > lock(mutex_); --- End diff -- Could you use a lock free queue here? might not save much but may increase throughput and we have one in our code base already. ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508 --- Diff: extensions/mqtt/AbstractMQTTProcessor.h --- @@ -0,0 +1,154 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +client_ = nullptr; +cleanSession_ = false; +keepAliveInterval_ = 60; +connectionTimeOut_ = 30; +qos_ = 0; +isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { +if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); --- End diff -- what happens if unsubscribe is not called due to failure? Is that eventually okay? ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +retain_ = false; +max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string , MQTTClient client, +int qos, bool retain, MQTTClient_deliveryToken ) : +flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), +qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + if (flow_size_ < max_seg_size_) +max_seg_size_ = flow_size_; + std::vector buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { +int readRet = stream->read([0], max_seg_size_); +if (readRet < 0) { + status_ = -1; + return read_size_; +} +if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = [0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), , _) != MQTTCLIENT_SUCCESS) { --- End diff -- does the publish copy the buffer? If not, does it finish entirely or will it create a callback ? I ask because you're passing in the buffer as the payload, but if there is a callback, we could possibly have memory that's freed when this function exits with the MQTTClient still in progress. ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541 --- Diff: thirdparty/paho.mqtt.c/.travis.yml --- @@ -0,0 +1,47 @@ +sudo: true --- End diff -- this file doesn't seem like it's needed. ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { +topic_ = value; +logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { +userName_ = value; +logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693 --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt --- @@ -0,0 +1,86 @@ +#*** +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +#http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Genis Riera Perez - Add support for building debian package +#***/ + +# Note: on OS X you should install XCode and the associated command-line tools + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) +PROJECT("paho" C) +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) + +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake") +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +## build settings +SET(PAHO_VERSION_MAJOR 1) +SET(PAHO_VERSION_MINOR 2) +SET(PAHO_VERSION_PATCH 0) +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) + +INCLUDE(GNUInstallDirs) + +STRING(TIMESTAMP BUILD_TIMESTAMP UTC) +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}") + +IF(WIN32) + ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD) +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + ADD_DEFINITIONS(-DOSX) +ENDIF() + +## build options +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ") +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library") +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)") +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs") +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package") +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run") + +ADD_SUBDIRECTORY(src) +IF(PAHO_BUILD_SAMPLES) +ADD_SUBDIRECTORY(src/samples) +ENDIF() + +IF(PAHO_BUILD_DOCUMENTATION) +ADD_SUBDIRECTORY(doc) +ENDIF() + +### packaging settings +IF (WIN32) +SET(CPACK_GENERATOR "ZIP") +ELSEIF(PAHO_BUILD_DEB_PACKAGE) +SET(CPACK_GENERATOR "DEB") +CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in +${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY) +SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake) +ADD_SUBDIRECTORY(debian) +ELSE() +SET(CPACK_GENERATOR "TGZ") +ENDIF() + +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR}) +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR}) +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH}) +INCLUDE(CPack) + +IF(PAHO_ENABLE_TESTING) --- End diff -- probably don't need tests, right? ---
[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312289#comment-16312289 ] ASF GitHub Bot commented on MINIFICPP-355: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi-minifi-cpp/pull/227 > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp pull request #227: MINIFICPP-355: Resolve issue with 32-bit ...
Github user asfgit closed the pull request at: https://github.com/apache/nifi-minifi-cpp/pull/227 ---
[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty
[ https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312282#comment-16312282 ] ASF GitHub Bot commented on NIFI-4724: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2362 @markap14 Thanks for your feedback, I've added another commit to incorporate your suggestions. Also rebased with the latest master just in case. I considered to implement the use of BlockingQueue for further optimization, too. However, I think it may be dangerous to reuse the same byte[] object because different FlowFiles may have different size of content. For example, the 1st FlowFile content is 'foo bar', then the 2nd FlowFile's is 'baz', then the 2nd message would be 'baz bar'. In addition to that, the `ProducerRecord` provided by Kafka client library does not take `size` and `offset` those are usually used to specify a part of a larger byte array. So, I think we need to create new byte[] object for each message. I hope my understanding is correct. But if there's more optimization can be done, please file a separate JIRA as I think it's out of scope for this ticket. > Publish kafka processors fails with FlowFileHandlingException if the flow > file is empty > --- > > Key: NIFI-4724 > URL: https://issues.apache.org/jira/browse/NIFI-4724 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.1.0 >Reporter: Mahesh Nayak >Assignee: Koji Kawamura > > 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile > 2. In GenerateFlowFile set the "File Size" to 0B. > 3. Start the flow. > Result : Kafka processor throws below exception > {code:None} > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding > PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught > Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > at > org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251) > at > org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi issue #2362: NIFI-4724: Support 0 byte message with PublishKafka
Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2362 @markap14 Thanks for your feedback, I've added another commit to incorporate your suggestions. Also rebased with the latest master just in case. I considered to implement the use of BlockingQueue for further optimization, too. However, I think it may be dangerous to reuse the same byte[] object because different FlowFiles may have different size of content. For example, the 1st FlowFile content is 'foo bar', then the 2nd FlowFile's is 'baz', then the 2nd message would be 'baz bar'. In addition to that, the `ProducerRecord` provided by Kafka client library does not take `size` and `offset` those are usually used to specify a part of a larger byte array. So, I think we need to create new byte[] object for each message. I hope my understanding is correct. But if there's more optimization can be done, please file a separate JIRA as I think it's out of scope for this ticket. ---
[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty
[ https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312279#comment-16312279 ] ASF GitHub Bot commented on NIFI-4724: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2362#discussion_r159791308 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java --- @@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b tracker = new InFlightMessageTracker(); } -try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { +try { byte[] messageContent; -try { +if (demarcatorBytes == null || demarcatorBytes.length == 0) { +// Send FlowFile content as it is, to support sending 0 byte message. +final ByteArrayOutputStream bos = new ByteArrayOutputStream(); --- End diff -- @markap14 Thanks for the advice. I switched to use StreamUtils as you suggested. > Publish kafka processors fails with FlowFileHandlingException if the flow > file is empty > --- > > Key: NIFI-4724 > URL: https://issues.apache.org/jira/browse/NIFI-4724 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.1.0 >Reporter: Mahesh Nayak >Assignee: Koji Kawamura > > 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile > 2. In GenerateFlowFile set the "File Size" to 0B. > 3. Start the flow. > Result : Kafka processor throws below exception > {code:None} > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding > PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught > Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > at > org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251) > at > org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2362: NIFI-4724: Support 0 byte message with PublishKafka
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2362#discussion_r159791308 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java --- @@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b tracker = new InFlightMessageTracker(); } -try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { +try { byte[] messageContent; -try { +if (demarcatorBytes == null || demarcatorBytes.length == 0) { +// Send FlowFile content as it is, to support sending 0 byte message. +final ByteArrayOutputStream bos = new ByteArrayOutputStream(); --- End diff -- @markap14 Thanks for the advice. I switched to use StreamUtils as you suggested. ---
[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty
[ https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312270#comment-16312270 ] ASF GitHub Bot commented on NIFI-4724: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2362#discussion_r159790221 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java --- @@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b tracker = new InFlightMessageTracker(); } -try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { +try { byte[] messageContent; -try { +if (demarcatorBytes == null || demarcatorBytes.length == 0) { --- End diff -- @markap14 I agree with that, added a code to transfer FlowFiles to 'failure' if content size exceeds the maxMessageSize. Thanks! > Publish kafka processors fails with FlowFileHandlingException if the flow > file is empty > --- > > Key: NIFI-4724 > URL: https://issues.apache.org/jira/browse/NIFI-4724 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.1.0 >Reporter: Mahesh Nayak >Assignee: Koji Kawamura > > 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile > 2. In GenerateFlowFile set the "File Size" to 0B. > 3. Start the flow. > Result : Kafka processor throws below exception > {code:None} > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding > PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught > Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] > o.a.n.c.t.ContinuallyRunProcessorTask > org.apache.nifi.processor.exception.FlowFileHandlingException: > StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0] > transfer relationship not specified > at > org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251) > at > org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2362: NIFI-4724: Support 0 byte message with PublishKafka
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2362#discussion_r159790221 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java --- @@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream flowFileContent, final b tracker = new InFlightMessageTracker(); } -try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { +try { byte[] messageContent; -try { +if (demarcatorBytes == null || demarcatorBytes.length == 0) { --- End diff -- @markap14 I agree with that, added a code to transfer FlowFiles to 'failure' if content size exceeds the maxMessageSize. Thanks! ---
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312207#comment-16312207 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2371 @markap14 I made the changes you requested. Please give it a once-over and let me know if there's anything outstanding. Thanks. > Create text count processor > --- > > Key: NIFI-4727 > URL: https://issues.apache.org/jira/browse/NIFI-4727 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.4.0 >Reporter: Andy LoPresto >Assignee: Andy LoPresto > Labels: processor, text > > A frequent community request is to count (lines/words/characters) in > arbitrary text. A {{CountTextProcessor}} would provide this functionality > natively and with solid performance, rather than abusing the {{SplitText}} or > {{ExecuteScript}} processors. > It should provide the following functionality (simultaneously, given options): > * Line count > * Non-empty line count > * Word count > * Character count > The flowfile content should remain unchanged, and each of the above (if > indicated) should be added as an attribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi issue #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2371 @markap14 I made the changes you requested. Please give it a once-over and let me know if there's anything outstanding. Thanks. ---
[jira] [Updated] (NIFI-4739) UI - Relative logout link
[ https://issues.apache.org/jira/browse/NIFI-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Gilman updated NIFI-4739: -- Status: Patch Available (was: In Progress) > UI - Relative logout link > - > > Key: NIFI-4739 > URL: https://issues.apache.org/jira/browse/NIFI-4739 > Project: Apache NiFi > Issue Type: Bug > Components: Core UI >Reporter: Matt Gilman >Assignee: Matt Gilman > > When NiFi is running behind a proxy, we need to links (like logout) to be > relative instead of absolute to ensure the window is redirected to the > correct path. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4739) UI - Relative logout link
[ https://issues.apache.org/jira/browse/NIFI-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312068#comment-16312068 ] ASF GitHub Bot commented on NIFI-4739: -- GitHub user mcgilman opened a pull request: https://github.com/apache/nifi/pull/2374 NIFI-4739: Ensure logout action is using a relative URL NIFI-4739: - Ensuring the logout action is using a relative link. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcgilman/nifi NIFI-4739 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2374 commit cff79a39cb807cd8849067283b8a98c29abd73f2 Author: Matt GilmanDate: 2018-01-04T20:54:19Z NIFI-4739: - Ensuring the logout action is using a relative link. > UI - Relative logout link > - > > Key: NIFI-4739 > URL: https://issues.apache.org/jira/browse/NIFI-4739 > Project: Apache NiFi > Issue Type: Bug > Components: Core UI >Reporter: Matt Gilman >Assignee: Matt Gilman > > When NiFi is running behind a proxy, we need to links (like logout) to be > relative instead of absolute to ensure the window is redirected to the > correct path. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2374: NIFI-4739: Ensure logout action is using a relative...
GitHub user mcgilman opened a pull request: https://github.com/apache/nifi/pull/2374 NIFI-4739: Ensure logout action is using a relative URL NIFI-4739: - Ensuring the logout action is using a relative link. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mcgilman/nifi NIFI-4739 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2374 commit cff79a39cb807cd8849067283b8a98c29abd73f2 Author: Matt GilmanDate: 2018-01-04T20:54:19Z NIFI-4739: - Ensuring the logout action is using a relative link. ---
[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312058#comment-16312058 ] ASF GitHub Bot commented on MINIFICPP-355: -- Github user joewitt commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/227 +1 > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...
Github user joewitt commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/227 +1 ---
[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly
[ https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312030#comment-16312030 ] ASF GitHub Bot commented on MINIFICPP-355: -- Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/227 looks good, can we merge into 1 commit > Starting minifi on arm fails quietly > > > Key: MINIFICPP-355 > URL: https://issues.apache.org/jira/browse/MINIFICPP-355 > Project: NiFi MiNiFi C++ > Issue Type: Bug >Affects Versions: 0.3.0 > Environment: raspberry pi zero w - arm >Reporter: Joseph Witt > Attachments: config.yml, minifi-app.log > > > During startup the process quietly dies. Aldrin showed me the cool process > to gather more data > gdb bin/minifi > -> run > -> backtrace > which yields > {quote} > [Thread 0xb0eff160 (LWP 24133) exited] > [Thread 0xb2eff160 (LWP 24138) exited] > Thread 1 "minifi" received signal SIGSEGV, Segmentation fault. > strlen () at ../sysdeps/arm/armv6/strlen.S:26 > 26../sysdeps/arm/armv6/strlen.S: No such file or directory. > (gdb) backtrace > #0 strlen () at ../sysdeps/arm/armv6/strlen.S:26 > #1 0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, > format=format@entry=0x609884 "Setting %d as the max queue size for %s", > ap=..., ap@entry=...) > at vfprintf.c:1637 > #2 0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max > queue size for p => [success]", maxlen=, > format=0x609884 "Setting %d as the max queue size for %s", > format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at > vsnprintf.c:114 > #3 0xb64f5b18 in __snprintf (s=, maxlen=, > format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33 > #4 0x00284a4c in std::__cxx11::basic_stringstd::allocator > > org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] () > #5 0x00285dec in void > org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, > std::__cxx11::basic_string > const&) () > #6 0x0027efb4 in > org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*, > org::apache::nifi::minifi::core::ProcessGroup*) () > #7 0x0025ef3c in > org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) > () > #8 0x0025f230 in > org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string std::char_traits, std::allocator > const&) () > #9 0x002965b8 in org::apache::nifi::minifi::FlowController::load() () > #10 0x00161998 in main () > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/227 looks good, can we merge into 1 commit ---
[jira] [Created] (NIFI-4739) UI - Relative logout link
Matt Gilman created NIFI-4739: - Summary: UI - Relative logout link Key: NIFI-4739 URL: https://issues.apache.org/jira/browse/NIFI-4739 Project: Apache NiFi Issue Type: Bug Components: Core UI Reporter: Matt Gilman Assignee: Matt Gilman When NiFi is running behind a proxy, we need to links (like logout) to be relative instead of absolute to ensure the window is redirected to the correct path. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Gilman updated NIFI-4738: -- Resolution: Fixed Fix Version/s: 1.5.0 Status: Resolved (was: Patch Available) > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > Fix For: 1.5.0 > > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311973#comment-16311973 ] ASF GitHub Bot commented on NIFI-4738: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2373 Thanks @mattyb149! This has been merged to master. > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi issue #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsig...
Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2373 Thanks @mattyb149! This has been merged to master. ---
[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311972#comment-16311972 ] ASF GitHub Bot commented on NIFI-4738: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2373 > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311970#comment-16311970 ] ASF subversion and git services commented on NIFI-4738: --- Commit ce4374ee0017708252e955ac94d34b61c481a1ce in nifi's branch refs/heads/master from [~ca9mbu] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=ce4374e ] NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints. This closes #2373 > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digi...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2373 ---
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311960#comment-16311960 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744615 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt --- @@ -0,0 +1,34 @@ +’Twas brillig, and the slithy toves --- End diff -- I selected it because it is no longer under copyright. I used [these laws](https://fairuse.stanford.edu/overview/public-domain/welcome/) as guidelines, but perhaps the ASF has additional rules? > Create text count processor > --- > > Key: NIFI-4727 > URL: https://issues.apache.org/jira/browse/NIFI-4727 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.4.0 >Reporter: Andy LoPresto >Assignee: Andy LoPresto > Labels: processor, text > > A frequent community request is to count (lines/words/characters) in > arbitrary text. A {{CountTextProcessor}} would provide this functionality > natively and with solid performance, rather than abusing the {{SplitText}} or > {{ExecuteScript}} processors. > It should provide the following functionality (simultaneously, given options): > * Line count > * Non-empty line count > * Word count > * Character count > The flowfile content should remain unchanged, and each of the above (if > indicated) should be added as an attribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744615 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt --- @@ -0,0 +1,34 @@ +’Twas brillig, and the slithy toves --- End diff -- I selected it because it is no longer under copyright. I used [these laws](https://fairuse.stanford.edu/overview/public-domain/welcome/) as guidelines, but perhaps the ASF has additional rules? ---
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311956#comment-16311956 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744183 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311959#comment-16311959 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744322 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.components.ValidationResult +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.security.util.EncryptionMethod +import org.apache.nifi.security.util.KeyDerivationFunction +import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor +import org.apache.nifi.util.MockProcessContext +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Ignore +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.security.Security + +@RunWith(JUnit4.class) +class CountTextTest extends GroovyTestCase { +private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class) + +@BeforeClass +static void setUpOnce() throws Exception { +Security.addProvider(new BouncyCastleProvider()) + +logger.metaClass.methodMissing = { String name, args -> +logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") +} +} + +@Before +void setUp() throws Exception { +} + +@After +void tearDown() throws Exception { +} + +@Test +void testShouldCountAllMetrics() throws Exception { +// Arrange +final TestRunner runner = TestRunners.newTestRunner(CountText.class) + +runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + +String INPUT_TEXT = """’Twas brillig, and the slithy toves --- End diff -- While *reading from a file* isn't under test here, I wanted to ensure that the newlines from static text and file loading weren't causing an issue. > Create text count processor > --- > > Key: NIFI-4727 > URL: https://issues.apache.org/jira/browse/NIFI-4727 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.4.0 >Reporter: Andy LoPresto >Assignee: Andy LoPresto > Labels: processor, text > > A frequent community request is to count (lines/words/characters) in > arbitrary text. A {{CountTextProcessor}} would provide this functionality > natively and with solid performance, rather than abusing the {{SplitText}} or > {{ExecuteScript}} processors. > It should provide the following functionality (simultaneously, given options): > * Line count > * Non-empty line count > * Word count > * Character count > The flowfile content should remain unchanged, and each of the above (if > indicated) should be added as an attribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311957#comment-16311957 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744222 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744322 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard + +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.components.ValidationResult +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.security.util.EncryptionMethod +import org.apache.nifi.security.util.KeyDerivationFunction +import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor +import org.apache.nifi.util.MockProcessContext +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Ignore +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.security.Security + +@RunWith(JUnit4.class) +class CountTextTest extends GroovyTestCase { +private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class) + +@BeforeClass +static void setUpOnce() throws Exception { +Security.addProvider(new BouncyCastleProvider()) + +logger.metaClass.methodMissing = { String name, args -> +logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") +} +} + +@Before +void setUp() throws Exception { +} + +@After +void tearDown() throws Exception { +} + +@Test +void testShouldCountAllMetrics() throws Exception { +// Arrange +final TestRunner runner = TestRunners.newTestRunner(CountText.class) + +runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true") +runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true") + +String INPUT_TEXT = """’Twas brillig, and the slithy toves --- End diff -- While *reading from a file* isn't under test here, I wanted to ensure that the newlines from static text and file loading weren't causing an issue. ---
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744222 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744183 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311954#comment-16311954 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744070 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159744070 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311952#comment-16311952 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743870 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311953#comment-16311953 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743902 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743902 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743870 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[jira] [Commented] (NIFI-4727) Create text count processor
[ https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311951#comment-16311951 ] ASF GitHub Bot commented on NIFI-4727: -- Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743739 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final
[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor
Github user alopresto commented on a diff in the pull request: https://github.com/apache/nifi/pull/2371#discussion_r159743739 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"count", "text", "line", "word", "character"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. " ++ "The resulting flowfile will not have its content modified.") +@WritesAttributes({ +@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"), +@WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"), +@WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"), +@WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"), +}) +@SeeAlso(SplitText.class) +public class CountText extends AbstractProcessor { +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String SYMBOL_REGEX = "[\\s-\\._]"; +private static final String WHITESPACE_ONLY_REGEX = "\\s"; + +// Attribute keys +public static final String TEXT_LINE_COUNT = "text.line.count"; +public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count"; +public static final String TEXT_WORD_COUNT = "text.word.count"; +public static final String TEXT_CHARACTER_COUNT = "text.character.count"; + +
[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311945#comment-16311945 ] ASF GitHub Bot commented on NIFI-4738: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2373 Will review... > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi issue #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsig...
Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2373 Will review... ---
[jira] [Updated] (NIFI-4383) UpdateRecord - cannot update arrays elements
[ https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mark Payne updated NIFI-4383: - Resolution: Fixed Fix Version/s: 1.5.0 Status: Resolved (was: Patch Available) > UpdateRecord - cannot update arrays elements > > > Key: NIFI-4383 > URL: https://issues.apache.org/jira/browse/NIFI-4383 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0 >Reporter: Pierre Villard >Assignee: Pierre Villard > Labels: records > Fix For: 1.5.0 > > > At the moment, if trying to use the update record to update the elements of > an array it won't have any effect. > Input: > {noformat} > { > "numbers" : [ 1, null, 4 ] > } > {noformat} > Parameters: > ||Path||Value||Expected output|| > |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}| > |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}| > |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}| > |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}| > When elements of the array are records, it's possible to update fields of the > record but not the record itself as-is. > Also in the MultiArrayIndexPath implementation, index of array elements is > not correctly provided. Because of that, wrong elements of the array could be > updated. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess updated NIFI-4738: --- Status: Patch Available (was: In Progress) > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311909#comment-16311909 ] ASF GitHub Bot commented on NIFI-4738: -- GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/2373 NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints I tested this with MEDIUMINT UNSIGNED, MEDIUMINT, and INT(9) UNSIGNED ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-4738 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2373 commit 9f2ec30f48efbf794884b7b5988b7bf265252904 Author: Matthew BurgessDate: 2018-01-04T19:30:47Z NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digi...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/2373 NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints I tested this with MEDIUMINT UNSIGNED, MEDIUMINT, and INT(9) UNSIGNED ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-4738 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2373 commit 9f2ec30f48efbf794884b7b5988b7bf265252904 Author: Matthew BurgessDate: 2018-01-04T19:30:47Z NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints ---
[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements
[ https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311907#comment-16311907 ] ASF GitHub Bot commented on NIFI-4383: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2208 > UpdateRecord - cannot update arrays elements > > > Key: NIFI-4383 > URL: https://issues.apache.org/jira/browse/NIFI-4383 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0 >Reporter: Pierre Villard >Assignee: Pierre Villard > Labels: records > > At the moment, if trying to use the update record to update the elements of > an array it won't have any effect. > Input: > {noformat} > { > "numbers" : [ 1, null, 4 ] > } > {noformat} > Parameters: > ||Path||Value||Expected output|| > |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}| > |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}| > |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}| > |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}| > When elements of the array are records, it's possible to update fields of the > record but not the record itself as-is. > Also in the MultiArrayIndexPath implementation, index of array elements is > not correctly provided. Because of that, wrong elements of the array could be > updated. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi issue #2208: NIFI-4383 - Fix UpdateRecord when updating arrays elements
Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2208 @pvillard31 all looks good to me. Thanks for catching this and updating it. Sorry it took so long to get merged, but it's in master now! ---
[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements
[ https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311905#comment-16311905 ] ASF subversion and git services commented on NIFI-4383: --- Commit 99d767aa44e4d5b1bc9fbca9a1e0ee6803972fea in nifi's branch refs/heads/master from [~pvillard] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=99d767a ] NIFI-4383 - Fix UpdateRecord when updating arrays elements. This closes #2208. Signed-off-by: Mark Payne> UpdateRecord - cannot update arrays elements > > > Key: NIFI-4383 > URL: https://issues.apache.org/jira/browse/NIFI-4383 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0 >Reporter: Pierre Villard >Assignee: Pierre Villard > Labels: records > > At the moment, if trying to use the update record to update the elements of > an array it won't have any effect. > Input: > {noformat} > { > "numbers" : [ 1, null, 4 ] > } > {noformat} > Parameters: > ||Path||Value||Expected output|| > |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}| > |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}| > |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}| > |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}| > When elements of the array are records, it's possible to update fields of the > record but not the record itself as-is. > Also in the MultiArrayIndexPath implementation, index of array elements is > not correctly provided. Because of that, wrong elements of the array could be > updated. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements
[ https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311908#comment-16311908 ] ASF GitHub Bot commented on NIFI-4383: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2208 @pvillard31 all looks good to me. Thanks for catching this and updating it. Sorry it took so long to get merged, but it's in master now! > UpdateRecord - cannot update arrays elements > > > Key: NIFI-4383 > URL: https://issues.apache.org/jira/browse/NIFI-4383 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0 >Reporter: Pierre Villard >Assignee: Pierre Villard > Labels: records > > At the moment, if trying to use the update record to update the elements of > an array it won't have any effect. > Input: > {noformat} > { > "numbers" : [ 1, null, 4 ] > } > {noformat} > Parameters: > ||Path||Value||Expected output|| > |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}| > |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}| > |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}| > |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}| > When elements of the array are records, it's possible to update fields of the > record but not the record itself as-is. > Also in the MultiArrayIndexPath implementation, index of array elements is > not correctly provided. Because of that, wrong elements of the array could be > updated. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2208: NIFI-4383 - Fix UpdateRecord when updating arrays e...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2208 ---
[jira] [Assigned] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
[ https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Burgess reassigned NIFI-4738: -- Assignee: Matt Burgess > ExecuteSQL error when table uses 9-digit unsigned int > - > > Key: NIFI-4738 > URL: https://issues.apache.org/jira/browse/NIFI-4738 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Assignee: Matt Burgess > > There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit > integers to be treated as ints when the DB (at least MySQL) will return a > Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is > because the logic bug is not exposed when the MEDIUMINT is signed (as it > satisfies the other condition of the if clause). But if a field is declared > as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, > and treat it as an Integer if so. But an unsigned value with 9 digits will > fit in a Long, and should be treated as such in the schema. > Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< > MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int
Matt Burgess created NIFI-4738: -- Summary: ExecuteSQL error when table uses 9-digit unsigned int Key: NIFI-4738 URL: https://issues.apache.org/jira/browse/NIFI-4738 Project: Apache NiFi Issue Type: Bug Components: Extensions Reporter: Matt Burgess There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit integers to be treated as ints when the DB (at least MySQL) will return a Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is because the logic bug is not exposed when the MEDIUMINT is signed (as it satisfies the other condition of the if clause). But if a field is declared as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, and treat it as an Integer if so. But an unsigned value with 9 digits will fit in a Long, and should be treated as such in the schema. Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< MAX_DIGITS_PER_INT". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (NIFI-4595) Add ConsumeAzureEventHub processor
[ https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph Witt updated NIFI-4595: -- Resolution: Fixed Fix Version/s: 1.5.0 Status: Resolved (was: Patch Available) updated L a little, updated dep version and corrected small rebase/class renaming that had happened since PR up. +1 merged to master. > Add ConsumeAzureEventHub processor > -- > > Key: NIFI-4595 > URL: https://issues.apache.org/jira/browse/NIFI-4595 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.5.0 > > > As reported by NIFI-2835, the existing GetAzureEventHub processor does not > handle partition distribution among nodes in a NiFi cluster, and handling > offset information across NiFi restart. > The processor uses PartitionReceiver directly, however, there's another > approach to receive events from Event Hub, that is using EventProcessorHost > class. > EventProcessorHost uses Azure Storage to store consumer group offset > information so that it can distribute consumer load among consumer instances > within the same group. Also this way can handle restart and NiFi cluster > scale scenario better. > In addition to above enhancements, we could also add Record data model > capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord > processor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4595) Add ConsumeAzureEventHub processor
[ https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311880#comment-16311880 ] ASF GitHub Bot commented on NIFI-4595: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2264 > Add ConsumeAzureEventHub processor > -- > > Key: NIFI-4595 > URL: https://issues.apache.org/jira/browse/NIFI-4595 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > As reported by NIFI-2835, the existing GetAzureEventHub processor does not > handle partition distribution among nodes in a NiFi cluster, and handling > offset information across NiFi restart. > The processor uses PartitionReceiver directly, however, there's another > approach to receive events from Event Hub, that is using EventProcessorHost > class. > EventProcessorHost uses Azure Storage to store consumer group offset > information so that it can distribute consumer load among consumer instances > within the same group. Also this way can handle restart and NiFi cluster > scale scenario better. > In addition to above enhancements, we could also add Record data model > capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord > processor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2264: NIFI-4595: Add ConsumeAzureEventHub.
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2264 ---
[jira] [Commented] (NIFI-4595) Add ConsumeAzureEventHub processor
[ https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311879#comment-16311879 ] ASF subversion and git services commented on NIFI-4595: --- Commit 98af3dc4cd56da324bb4572aab2dc395338ed45d in nifi's branch refs/heads/master from [~ijokarumawak] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=98af3dc ] NIFI-4595: This closes #2264. Add ConsumeAzureEventHub. Signed-off-by: joewitt> Add ConsumeAzureEventHub processor > -- > > Key: NIFI-4595 > URL: https://issues.apache.org/jira/browse/NIFI-4595 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > As reported by NIFI-2835, the existing GetAzureEventHub processor does not > handle partition distribution among nodes in a NiFi cluster, and handling > offset information across NiFi restart. > The processor uses PartitionReceiver directly, however, there's another > approach to receive events from Event Hub, that is using EventProcessorHost > class. > EventProcessorHost uses Azure Storage to store consumer group offset > information so that it can distribute consumer load among consumer instances > within the same group. Also this way can handle restart and NiFi cluster > scale scenario better. > In addition to above enhancements, we could also add Record data model > capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord > processor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (NIFI-4733) Issue with variable registry two phase commit
[ https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Gilman updated NIFI-4733: -- Resolution: Fixed Fix Version/s: 1.5.0 Status: Resolved (was: Patch Available) > Issue with variable registry two phase commit > - > > Key: NIFI-4733 > URL: https://issues.apache.org/jira/browse/NIFI-4733 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.4.0 >Reporter: Matt Gilman >Assignee: Matt Gilman > Fix For: 1.5.0 > > > Need to address the logic issue during the two-phase commit exchange when > running clustered and invoking a variable registry update request. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit
[ https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311763#comment-16311763 ] ASF subversion and git services commented on NIFI-4733: --- Commit 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e in nifi's branch refs/heads/master from [~mcgilman] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=7a8dbb8 ] NIFI-4733: - Resolving logic issue in two phase commit when updating variable registry. This closes #2370 > Issue with variable registry two phase commit > - > > Key: NIFI-4733 > URL: https://issues.apache.org/jira/browse/NIFI-4733 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.4.0 >Reporter: Matt Gilman >Assignee: Matt Gilman > > Need to address the logic issue during the two-phase commit exchange when > running clustered and invoking a variable registry update request. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] nifi pull request #2370: NIFI-4733: Addressing two phase commit logic issue ...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2370 ---
[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit
[ https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311765#comment-16311765 ] ASF GitHub Bot commented on NIFI-4733: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2370 > Issue with variable registry two phase commit > - > > Key: NIFI-4733 > URL: https://issues.apache.org/jira/browse/NIFI-4733 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.4.0 >Reporter: Matt Gilman >Assignee: Matt Gilman > > Need to address the logic issue during the two-phase commit exchange when > running clustered and invoking a variable registry update request. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit
[ https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311761#comment-16311761 ] ASF GitHub Bot commented on NIFI-4733: -- Github user mcgilman commented on the issue: https://github.com/apache/nifi/pull/2370 Thanks @alopresto! I've addressed the comments here and I will merge to master. > Issue with variable registry two phase commit > - > > Key: NIFI-4733 > URL: https://issues.apache.org/jira/browse/NIFI-4733 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.4.0 >Reporter: Matt Gilman >Assignee: Matt Gilman > > Need to address the logic issue during the two-phase commit exchange when > running clustered and invoking a variable registry update request. -- This message was sent by Atlassian JIRA (v6.4.14#64029)