[GitHub] nifi pull request #2375: autoclosesocket

2018-01-04 Thread guangxuewu
GitHub user guangxuewu opened a pull request:

https://github.com/apache/nifi/pull/2375

autoclosesocket

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guangxuewu/nifi master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2375.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2375


commit 7c9456d25b273bd8586a9352772801f5d9b462be
Author: guangxuewu 
Date:   2018-01-05T05:46:55Z

autoclosesocket




---


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312453#comment-16312453
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

will remove


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312451#comment-16312451
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

it add the MQTT header and call socket write.
the deliverable callback is for QOS.
/**
 * This is a callback function. The client application
 * must provide an implementation of this function to enable asynchronous
 * notification of delivery of messages. The function is registered with the
 * client library by passing it as an argument to MQTTClient_setCallbacks().
 * It is called by the client library after the client application has
 * published a message to the server. It indicates that the necessary
 * handshaking and acknowledgements for the requested quality of service 
(see
 * MQTTClient_message.qos) have been completed. This function is executed 
on a
 * separate thread to the one on which the client application is running.
 * Note:MQTTClient_deliveryComplete() is not called when messages are
 * published at QoS0.
 * @param context A pointer to the context value originally 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312452#comment-16312452
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

will remove


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

it add the MQTT header and call socket write.
the deliverable callback is for QOS.
/**
 * This is a callback function. The client application
 * must provide an implementation of this function to enable asynchronous
 * notification of delivery of messages. The function is registered with the
 * client library by passing it as an argument to MQTTClient_setCallbacks().
 * It is called by the client library after the client application has
 * published a message to the server. It indicates that the necessary
 * handshaking and acknowledgements for the requested quality of service 
(see
 * MQTTClient_message.qos) have been completed. This function is executed 
on a
 * separate thread to the one on which the client application is running.
 * Note:MQTTClient_deliveryComplete() is not called when messages are
 * published at QoS0.
 * @param context A pointer to the context value originally passed to
 * MQTTClient_setCallbacks(), which contains any application-specific 
context.
 * @param dt The ::MQTTClient_deliveryToken associated with
 * the published message. Applications can check that all messages have been
 * 

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

will remove


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

will remove


---


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312444#comment-16312444
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

will do


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312443#comment-16312443
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

it should be. only side effect is if app halt and did not unsub. The broker 
will buffer the msgs to the topic while the client is go away. if the app 
restart with the same topic and same client ID, it will rx all msg buffer for 
that client buffered in the broker.



> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

will do


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

it should be. only side effect is if app halt and did not unsub. The broker 
will buffer the msgs to the topic while the client is go away. if the app 
restart with the same topic and same client ID, it will rx all msg buffer for 
that client buffered in the broker.



---


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312438#comment-16312438
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312427#comment-16312427
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312426#comment-16312426
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && 

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && 

[jira] [Updated] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo updated MINIFICPP-355:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Closed, pending additional issues found by the reporter. 

> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
>Assignee: marco polo
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo reassigned MINIFICPP-355:


Assignee: marco polo

> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
>Assignee: marco polo
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo updated MINIFICPP-355:
-
Status: Patch Available  (was: Open)

> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312326#comment-16312326
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312320#comment-16312320
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

what happens if unsubscribe is not called due to failure? Is that 
eventually okay?


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312318#comment-16312318
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312322#comment-16312322
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312319#comment-16312319
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312324#comment-16312324
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

this file doesn't seem like it's needed. 


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312321#comment-16312321
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

probably don't need tests, right?


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312325#comment-16312325
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

Could you use a lock free queue here? might not save much but may increase 
throughput and we have one in our code base already.


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312323#comment-16312323
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

does the publish copy the buffer? If not, does it finish entirely or will 
it create a callback ? I ask because you're passing in the buffer as the 
payload, but if there is a callback, we could possibly have memory that's freed 
when this function exits with the MQTTClient still in progress. 


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

Could you use a lock free queue here? might not save much but may increase 
throughput and we have one in our code base already.


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

what happens if unsubscribe is not called due to failure? Is that 
eventually okay?


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

does the publish copy the buffer? If not, does it finish entirely or will 
it create a callback ? I ask because you're passing in the buffer as the 
payload, but if there is a callback, we could possibly have memory that's freed 
when this function exits with the MQTTClient still in progress. 


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

this file doesn't seem like it's needed. 


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
+userName_ = value;
+logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_);
+  }
+  value = "";
+  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread phrocker
Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

probably don't need tests, right?


---


[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312289#comment-16312289
 ] 

ASF GitHub Bot commented on MINIFICPP-355:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi-minifi-cpp/pull/227


> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp pull request #227: MINIFICPP-355: Resolve issue with 32-bit ...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi-minifi-cpp/pull/227


---


[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312282#comment-16312282
 ] 

ASF GitHub Bot commented on NIFI-4724:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2362
  
@markap14 Thanks for your feedback, I've added another commit to 
incorporate your suggestions. Also rebased with the latest master just in case.

I considered to implement the use of BlockingQueue for further 
optimization, too. However, I think it may be dangerous to reuse the same 
byte[] object because different FlowFiles may have different size of content. 
For example, the 1st FlowFile content is 'foo bar', then the 2nd FlowFile's is 
'baz', then the 2nd message would be 'baz bar'. In addition to that, the 
`ProducerRecord` provided by Kafka client library does not take `size` and 
`offset` those are usually used to specify a part of a larger byte array. So, I 
think we need to create new byte[] object for each message. I hope my 
understanding is correct. But if there's more optimization can be done, please 
file a separate JIRA as I think it's out of scope for this ticket.


> Publish kafka processors fails with FlowFileHandlingException if the flow 
> file is empty
> ---
>
> Key: NIFI-4724
> URL: https://issues.apache.org/jira/browse/NIFI-4724
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.1.0
>Reporter: Mahesh Nayak
>Assignee: Koji Kawamura
>
> 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile
> 2. In GenerateFlowFile set the "File Size" to 0B.
> 3. Start the flow.
> Result : Kafka processor throws below exception
> {code:None}
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught 
> Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2362: NIFI-4724: Support 0 byte message with PublishKafka

2018-01-04 Thread ijokarumawak
Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2362
  
@markap14 Thanks for your feedback, I've added another commit to 
incorporate your suggestions. Also rebased with the latest master just in case.

I considered to implement the use of BlockingQueue for further 
optimization, too. However, I think it may be dangerous to reuse the same 
byte[] object because different FlowFiles may have different size of content. 
For example, the 1st FlowFile content is 'foo bar', then the 2nd FlowFile's is 
'baz', then the 2nd message would be 'baz bar'. In addition to that, the 
`ProducerRecord` provided by Kafka client library does not take `size` and 
`offset` those are usually used to specify a part of a larger byte array. So, I 
think we need to create new byte[] object for each message. I hope my 
understanding is correct. But if there's more optimization can be done, please 
file a separate JIRA as I think it's out of scope for this ticket.


---


[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312279#comment-16312279
 ] 

ASF GitHub Bot commented on NIFI-4724:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2362#discussion_r159791308
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 ---
@@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream 
flowFileContent, final b
 tracker = new InFlightMessageTracker();
 }
 
-try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+try {
 byte[] messageContent;
-try {
+if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+// Send FlowFile content as it is, to support sending 0 
byte message.
+final ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
--- End diff --

@markap14 Thanks for the advice. I switched to use StreamUtils as you 
suggested.


> Publish kafka processors fails with FlowFileHandlingException if the flow 
> file is empty
> ---
>
> Key: NIFI-4724
> URL: https://issues.apache.org/jira/browse/NIFI-4724
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.1.0
>Reporter: Mahesh Nayak
>Assignee: Koji Kawamura
>
> 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile
> 2. In GenerateFlowFile set the "File Size" to 0B.
> 3. Start the flow.
> Result : Kafka processor throws below exception
> {code:None}
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught 
> Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2362: NIFI-4724: Support 0 byte message with PublishKafka

2018-01-04 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2362#discussion_r159791308
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 ---
@@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream 
flowFileContent, final b
 tracker = new InFlightMessageTracker();
 }
 
-try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+try {
 byte[] messageContent;
-try {
+if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+// Send FlowFile content as it is, to support sending 0 
byte message.
+final ByteArrayOutputStream bos = new 
ByteArrayOutputStream();
--- End diff --

@markap14 Thanks for the advice. I switched to use StreamUtils as you 
suggested.


---


[jira] [Commented] (NIFI-4724) Publish kafka processors fails with FlowFileHandlingException if the flow file is empty

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312270#comment-16312270
 ] 

ASF GitHub Bot commented on NIFI-4724:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2362#discussion_r159790221
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 ---
@@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream 
flowFileContent, final b
 tracker = new InFlightMessageTracker();
 }
 
-try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+try {
 byte[] messageContent;
-try {
+if (demarcatorBytes == null || demarcatorBytes.length == 0) {
--- End diff --

@markap14 I agree with that, added a code to transfer FlowFiles to 
'failure' if content size exceeds the maxMessageSize. Thanks!


> Publish kafka processors fails with FlowFileHandlingException if the flow 
> file is empty
> ---
>
> Key: NIFI-4724
> URL: https://issues.apache.org/jira/browse/NIFI-4724
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.1.0
>Reporter: Mahesh Nayak
>Assignee: Koji Kawamura
>
> 1. Construct the flow GenerateFlowFile --> PublishKafka --> PutFile
> 2. In GenerateFlowFile set the "File Size" to 0B.
> 3. Start the flow.
> Result : Kafka processor throws below exception
> {code:None}
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PublishKafka_0_10[id=95dbc77a-0160-1000--69761c4e] due to uncaught 
> Exception: org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> 2017-12-27 02:49:21,933 WARN [Timer-Driven Process Thread-9] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=4d6cb989-b6a7-4129-9dfc-1598ee2b3937,claim=,offset=0,name=7061091998478433,size=0]
>  transfer relationship not specified
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2362: NIFI-4724: Support 0 byte message with PublishKafka

2018-01-04 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2362#discussion_r159790221
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 ---
@@ -71,9 +72,18 @@ void publish(final FlowFile flowFile, final InputStream 
flowFileContent, final b
 tracker = new InFlightMessageTracker();
 }
 
-try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+try {
 byte[] messageContent;
-try {
+if (demarcatorBytes == null || demarcatorBytes.length == 0) {
--- End diff --

@markap14 I agree with that, added a code to transfer FlowFiles to 
'failure' if content size exceeds the maxMessageSize. Thanks!


---


[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312207#comment-16312207
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on the issue:

https://github.com/apache/nifi/pull/2371
  
@markap14 I made the changes you requested. Please give it a once-over and 
let me know if there's anything outstanding. Thanks. 


> Create text count processor
> ---
>
> Key: NIFI-4727
> URL: https://issues.apache.org/jira/browse/NIFI-4727
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.4.0
>Reporter: Andy LoPresto
>Assignee: Andy LoPresto
>  Labels: processor, text
>
> A frequent community request is to count (lines/words/characters) in 
> arbitrary text. A {{CountTextProcessor}} would provide this functionality 
> natively and with solid performance, rather than abusing the {{SplitText}} or 
> {{ExecuteScript}} processors. 
> It should provide the following functionality (simultaneously, given options):
> * Line count
> * Non-empty line count
> * Word count
> * Character count
> The flowfile content should remain unchanged, and each of the above (if 
> indicated) should be added as an attribute. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on the issue:

https://github.com/apache/nifi/pull/2371
  
@markap14 I made the changes you requested. Please give it a once-over and 
let me know if there's anything outstanding. Thanks. 


---


[jira] [Updated] (NIFI-4739) UI - Relative logout link

2018-01-04 Thread Matt Gilman (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Gilman updated NIFI-4739:
--
Status: Patch Available  (was: In Progress)

> UI - Relative logout link
> -
>
> Key: NIFI-4739
> URL: https://issues.apache.org/jira/browse/NIFI-4739
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core UI
>Reporter: Matt Gilman
>Assignee: Matt Gilman
>
> When NiFi is running behind a proxy, we need to links (like logout) to be 
> relative instead of absolute to ensure the window is redirected to the 
> correct path.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4739) UI - Relative logout link

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312068#comment-16312068
 ] 

ASF GitHub Bot commented on NIFI-4739:
--

GitHub user mcgilman opened a pull request:

https://github.com/apache/nifi/pull/2374

NIFI-4739: Ensure logout action is using a relative URL

NIFI-4739:
- Ensuring the logout action is using a relative link.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mcgilman/nifi NIFI-4739

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2374


commit cff79a39cb807cd8849067283b8a98c29abd73f2
Author: Matt Gilman 
Date:   2018-01-04T20:54:19Z

NIFI-4739:
- Ensuring the logout action is using a relative link.




> UI - Relative logout link
> -
>
> Key: NIFI-4739
> URL: https://issues.apache.org/jira/browse/NIFI-4739
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core UI
>Reporter: Matt Gilman
>Assignee: Matt Gilman
>
> When NiFi is running behind a proxy, we need to links (like logout) to be 
> relative instead of absolute to ensure the window is redirected to the 
> correct path.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2374: NIFI-4739: Ensure logout action is using a relative...

2018-01-04 Thread mcgilman
GitHub user mcgilman opened a pull request:

https://github.com/apache/nifi/pull/2374

NIFI-4739: Ensure logout action is using a relative URL

NIFI-4739:
- Ensuring the logout action is using a relative link.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mcgilman/nifi NIFI-4739

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2374


commit cff79a39cb807cd8849067283b8a98c29abd73f2
Author: Matt Gilman 
Date:   2018-01-04T20:54:19Z

NIFI-4739:
- Ensuring the logout action is using a relative link.




---


[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312058#comment-16312058
 ] 

ASF GitHub Bot commented on MINIFICPP-355:
--

Github user joewitt commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/227
  
+1


> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...

2018-01-04 Thread joewitt
Github user joewitt commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/227
  
+1


---


[jira] [Commented] (MINIFICPP-355) Starting minifi on arm fails quietly

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312030#comment-16312030
 ] 

ASF GitHub Bot commented on MINIFICPP-355:
--

Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/227
  
looks good, can we merge into 1 commit


> Starting minifi on arm fails quietly
> 
>
> Key: MINIFICPP-355
> URL: https://issues.apache.org/jira/browse/MINIFICPP-355
> Project: NiFi MiNiFi C++
>  Issue Type: Bug
>Affects Versions: 0.3.0
> Environment: raspberry pi zero w - arm
>Reporter: Joseph Witt
> Attachments: config.yml, minifi-app.log
>
>
> During startup the process quietly dies.  Aldrin showed me the cool process 
> to gather more data
> gdb bin/minifi
> -> run
> -> backtrace
> which yields 
> {quote}
> [Thread 0xb0eff160 (LWP 24133) exited]
> [Thread 0xb2eff160 (LWP 24138) exited]
> Thread 1 "minifi" received signal SIGSEGV, Segmentation fault.
> strlen () at ../sysdeps/arm/armv6/strlen.S:26
> 26../sysdeps/arm/armv6/strlen.S: No such file or directory.
> (gdb) backtrace
> #0  strlen () at ../sysdeps/arm/armv6/strlen.S:26
> #1  0xb64f0690 in _IO_vfprintf_internal (s=s@entry=0xbeffc3c0, 
> format=format@entry=0x609884 "Setting %d as the max queue size for %s", 
> ap=..., ap@entry=...)
> at vfprintf.c:1637
> #2  0xb6513d2c in _IO_vsnprintf (string=0xbeffc4f0 "Setting 0 as the max 
> queue size for p => [success]", maxlen=,
> format=0x609884 "Setting %d as the max queue size for %s", 
> format@entry=0xbeffcac8 "\330\033\207", args=..., args@entry=...) at 
> vsnprintf.c:114
> #3  0xb64f5b18 in __snprintf (s=, maxlen=, 
> format=0x609884 "Setting %d as the max queue size for %s") at snprintf.c:33
> #4  0x00284a4c in std::__cxx11::basic_string std::allocator > 
> org::apache::nifi::minifi::core::logging::format_string const*>(char const*, long long&&, char const*&&) [clone .isra.369] ()
> #5  0x00285dec in void 
> org::apache::nifi::minifi::core::logging::Logger::log std::__cxx11::basic_string > >(spdlog::level::level_enum, char const*, long long const&, 
> std::__cxx11::basic_string > const&) ()
> #6  0x0027efb4 in 
> org::apache::nifi::minifi::core::YamlConfiguration::parseConnectionYaml(YAML::Node*,
>  org::apache::nifi::minifi::core::ProcessGroup*) ()
> #7  0x0025ef3c in 
> org::apache::nifi::minifi::core::YamlConfiguration::getYamlRoot(YAML::Node*) 
> ()
> #8  0x0025f230 in 
> org::apache::nifi::minifi::core::YamlConfiguration::getRoot(std::__cxx11::basic_string  std::char_traits, std::allocator > const&) ()
> #9  0x002965b8 in org::apache::nifi::minifi::FlowController::load() ()
> #10 0x00161998 in main ()
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...

2018-01-04 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/227
  
looks good, can we merge into 1 commit


---


[jira] [Created] (NIFI-4739) UI - Relative logout link

2018-01-04 Thread Matt Gilman (JIRA)
Matt Gilman created NIFI-4739:
-

 Summary: UI - Relative logout link
 Key: NIFI-4739
 URL: https://issues.apache.org/jira/browse/NIFI-4739
 Project: Apache NiFi
  Issue Type: Bug
  Components: Core UI
Reporter: Matt Gilman
Assignee: Matt Gilman


When NiFi is running behind a proxy, we need to links (like logout) to be 
relative instead of absolute to ensure the window is redirected to the correct 
path.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread Matt Gilman (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Gilman updated NIFI-4738:
--
   Resolution: Fixed
Fix Version/s: 1.5.0
   Status: Resolved  (was: Patch Available)

> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
> Fix For: 1.5.0
>
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311973#comment-16311973
 ] 

ASF GitHub Bot commented on NIFI-4738:
--

Github user mcgilman commented on the issue:

https://github.com/apache/nifi/pull/2373
  
Thanks @mattyb149! This has been merged to master.


> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsig...

2018-01-04 Thread mcgilman
Github user mcgilman commented on the issue:

https://github.com/apache/nifi/pull/2373
  
Thanks @mattyb149! This has been merged to master.


---


[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311972#comment-16311972
 ] 

ASF GitHub Bot commented on NIFI-4738:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2373


> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311970#comment-16311970
 ] 

ASF subversion and git services commented on NIFI-4738:
---

Commit ce4374ee0017708252e955ac94d34b61c481a1ce in nifi's branch 
refs/heads/master from [~ca9mbu]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=ce4374e ]

NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints. This closes 
#2373


> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digi...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2373


---


[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311960#comment-16311960
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744615
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt
 ---
@@ -0,0 +1,34 @@
+’Twas brillig, and the slithy toves
--- End diff --

I selected it because it is no longer under copyright. I used [these 
laws](https://fairuse.stanford.edu/overview/public-domain/welcome/) as 
guidelines, but perhaps the ASF has additional rules?


> Create text count processor
> ---
>
> Key: NIFI-4727
> URL: https://issues.apache.org/jira/browse/NIFI-4727
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.4.0
>Reporter: Andy LoPresto
>Assignee: Andy LoPresto
>  Labels: processor, text
>
> A frequent community request is to count (lines/words/characters) in 
> arbitrary text. A {{CountTextProcessor}} would provide this functionality 
> natively and with solid performance, rather than abusing the {{SplitText}} or 
> {{ExecuteScript}} processors. 
> It should provide the following functionality (simultaneously, given options):
> * Line count
> * Non-empty line count
> * Word count
> * Character count
> The flowfile content should remain unchanged, and each of the above (if 
> indicated) should be added as an attribute. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744615
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt
 ---
@@ -0,0 +1,34 @@
+’Twas brillig, and the slithy toves
--- End diff --

I selected it because it is no longer under copyright. I used [these 
laws](https://fairuse.stanford.edu/overview/public-domain/welcome/) as 
guidelines, but perhaps the ASF has additional rules?


---


[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311956#comment-16311956
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744183
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311959#comment-16311959
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744322
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.components.ValidationResult
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.security.util.EncryptionMethod
+import org.apache.nifi.security.util.KeyDerivationFunction
+import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor
+import org.apache.nifi.util.MockProcessContext
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Ignore
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.security.Security
+
+@RunWith(JUnit4.class)
+class CountTextTest extends GroovyTestCase {
+private static final Logger logger = 
LoggerFactory.getLogger(CountTextTest.class)
+
+@BeforeClass
+static void setUpOnce() throws Exception {
+Security.addProvider(new BouncyCastleProvider())
+
+logger.metaClass.methodMissing = { String name, args ->
+logger.info("[${name?.toUpperCase()}] ${(args as List).join(" 
")}")
+}
+}
+
+@Before
+void setUp() throws Exception {
+}
+
+@After
+void tearDown() throws Exception {
+}
+
+@Test
+void testShouldCountAllMetrics() throws Exception {
+// Arrange
+final TestRunner runner = 
TestRunners.newTestRunner(CountText.class)
+
+runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+
+String INPUT_TEXT = """’Twas brillig, and the slithy toves
--- End diff --

While *reading from a file* isn't under test here, I wanted to ensure that 
the newlines from static text and file loading weren't causing an issue. 


> Create text count processor
> ---
>
> Key: NIFI-4727
> URL: https://issues.apache.org/jira/browse/NIFI-4727
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.4.0
>Reporter: Andy LoPresto
>Assignee: Andy LoPresto
>  Labels: processor, text
>
> A frequent community request is to count (lines/words/characters) in 
> arbitrary text. A {{CountTextProcessor}} would provide this functionality 
> natively and with solid performance, rather than abusing the {{SplitText}} or 
> {{ExecuteScript}} processors. 
> It should provide the following functionality (simultaneously, given options):
> * Line count
> * Non-empty line count
> * Word count
> * Character count
> The flowfile content should remain unchanged, and each of the above (if 
> indicated) should be added as an attribute. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311957#comment-16311957
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744222
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744322
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.components.ValidationResult
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.security.util.EncryptionMethod
+import org.apache.nifi.security.util.KeyDerivationFunction
+import org.apache.nifi.security.util.crypto.PasswordBasedEncryptor
+import org.apache.nifi.util.MockProcessContext
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Assert
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Ignore
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.security.Security
+
+@RunWith(JUnit4.class)
+class CountTextTest extends GroovyTestCase {
+private static final Logger logger = 
LoggerFactory.getLogger(CountTextTest.class)
+
+@BeforeClass
+static void setUpOnce() throws Exception {
+Security.addProvider(new BouncyCastleProvider())
+
+logger.metaClass.methodMissing = { String name, args ->
+logger.info("[${name?.toUpperCase()}] ${(args as List).join(" 
")}")
+}
+}
+
+@Before
+void setUp() throws Exception {
+}
+
+@After
+void tearDown() throws Exception {
+}
+
+@Test
+void testShouldCountAllMetrics() throws Exception {
+// Arrange
+final TestRunner runner = 
TestRunners.newTestRunner(CountText.class)
+
+runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+
+String INPUT_TEXT = """’Twas brillig, and the slithy toves
--- End diff --

While *reading from a file* isn't under test here, I wanted to ensure that 
the newlines from static text and file loading weren't causing an issue. 


---


[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744222
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744183
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311954#comment-16311954
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744070
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159744070
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311952#comment-16311952
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743870
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311953#comment-16311953
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743902
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743902
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743870
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[jira] [Commented] (NIFI-4727) Create text count processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311951#comment-16311951
 ] 

ASF GitHub Bot commented on NIFI-4727:
--

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743739
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final 

[GitHub] nifi pull request #2371: NIFI-4727 Add CountText processor

2018-01-04 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2371#discussion_r159743739
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The 
requested results will be recorded as attributes. "
++ "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "text.line.count", description = "The 
number of lines of text present in the FlowFile content"),
+@WritesAttribute(attribute = "text.line.nonempty.count", 
description = "The number of lines of text (with at least one non-whitespace 
character) present in the original FlowFile"),
+@WritesAttribute(attribute = "text.word.count", description = "The 
number of words present in the original FlowFile"),
+@WritesAttribute(attribute = "text.character.count", description = 
"The number of characters (given the specified character encoding) present in 
the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+private static final List STANDARD_CHARSETS = Arrays.asList(
+StandardCharsets.UTF_8,
+StandardCharsets.US_ASCII,
+StandardCharsets.ISO_8859_1,
+StandardCharsets.UTF_16,
+StandardCharsets.UTF_16LE,
+StandardCharsets.UTF_16BE);
+
+private static final String SYMBOL_REGEX = "[\\s-\\._]";
+private static final String WHITESPACE_ONLY_REGEX = "\\s";
+
+// Attribute keys
+public static final String TEXT_LINE_COUNT = "text.line.count";
+public static final String TEXT_LINE_NONEMPTY_COUNT = 
"text.line.nonempty.count";
+public static final String TEXT_WORD_COUNT = "text.word.count";
+public static final String TEXT_CHARACTER_COUNT = 
"text.character.count";
+
+

[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311945#comment-16311945
 ] 

ASF GitHub Bot commented on NIFI-4738:
--

Github user mcgilman commented on the issue:

https://github.com/apache/nifi/pull/2373
  
Will review...


> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsig...

2018-01-04 Thread mcgilman
Github user mcgilman commented on the issue:

https://github.com/apache/nifi/pull/2373
  
Will review...


---


[jira] [Updated] (NIFI-4383) UpdateRecord - cannot update arrays elements

2018-01-04 Thread Mark Payne (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Payne updated NIFI-4383:
-
   Resolution: Fixed
Fix Version/s: 1.5.0
   Status: Resolved  (was: Patch Available)

> UpdateRecord - cannot update arrays elements
> 
>
> Key: NIFI-4383
> URL: https://issues.apache.org/jira/browse/NIFI-4383
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Pierre Villard
>Assignee: Pierre Villard
>  Labels: records
> Fix For: 1.5.0
>
>
> At the moment, if trying to use the update record to update the elements of 
> an array it won't have any effect.
> Input:
> {noformat}
> {
>   "numbers" : [ 1, null, 4 ]
> }
> {noformat}
> Parameters:
> ||Path||Value||Expected output||
> |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}|
> |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}|
> |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}|
> |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}|
> When elements of the array are records, it's possible to update fields of the 
> record but not the record itself as-is.
> Also in the MultiArrayIndexPath implementation, index of array elements is 
> not correctly provided. Because of that, wrong elements of the array could be 
> updated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread Matt Burgess (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess updated NIFI-4738:
---
Status: Patch Available  (was: In Progress)

> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311909#comment-16311909
 ] 

ASF GitHub Bot commented on NIFI-4738:
--

GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/2373

NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints

I tested this with MEDIUMINT UNSIGNED, MEDIUMINT, and INT(9) UNSIGNED

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-4738

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2373


commit 9f2ec30f48efbf794884b7b5988b7bf265252904
Author: Matthew Burgess 
Date:   2018-01-04T19:30:47Z

NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints




> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2373: NIFI-4738: Fixed logic bug in JdbcCommon for 9-digi...

2018-01-04 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/2373

NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints

I tested this with MEDIUMINT UNSIGNED, MEDIUMINT, and INT(9) UNSIGNED

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-4738

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2373


commit 9f2ec30f48efbf794884b7b5988b7bf265252904
Author: Matthew Burgess 
Date:   2018-01-04T19:30:47Z

NIFI-4738: Fixed logic bug in JdbcCommon for 9-digit unsigned ints




---


[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311907#comment-16311907
 ] 

ASF GitHub Bot commented on NIFI-4383:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2208


> UpdateRecord - cannot update arrays elements
> 
>
> Key: NIFI-4383
> URL: https://issues.apache.org/jira/browse/NIFI-4383
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Pierre Villard
>Assignee: Pierre Villard
>  Labels: records
>
> At the moment, if trying to use the update record to update the elements of 
> an array it won't have any effect.
> Input:
> {noformat}
> {
>   "numbers" : [ 1, null, 4 ]
> }
> {noformat}
> Parameters:
> ||Path||Value||Expected output||
> |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}|
> |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}|
> |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}|
> |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}|
> When elements of the array are records, it's possible to update fields of the 
> record but not the record itself as-is.
> Also in the MultiArrayIndexPath implementation, index of array elements is 
> not correctly provided. Because of that, wrong elements of the array could be 
> updated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2208: NIFI-4383 - Fix UpdateRecord when updating arrays elements

2018-01-04 Thread markap14
Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2208
  
@pvillard31 all looks good to me. Thanks for catching this and updating it. 
Sorry it took so long to get merged, but it's in master now!


---


[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements

2018-01-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311905#comment-16311905
 ] 

ASF subversion and git services commented on NIFI-4383:
---

Commit 99d767aa44e4d5b1bc9fbca9a1e0ee6803972fea in nifi's branch 
refs/heads/master from [~pvillard]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=99d767a ]

NIFI-4383 - Fix UpdateRecord when updating arrays elements. This closes #2208.

Signed-off-by: Mark Payne 


> UpdateRecord - cannot update arrays elements
> 
>
> Key: NIFI-4383
> URL: https://issues.apache.org/jira/browse/NIFI-4383
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Pierre Villard
>Assignee: Pierre Villard
>  Labels: records
>
> At the moment, if trying to use the update record to update the elements of 
> an array it won't have any effect.
> Input:
> {noformat}
> {
>   "numbers" : [ 1, null, 4 ]
> }
> {noformat}
> Parameters:
> ||Path||Value||Expected output||
> |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}|
> |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}|
> |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}|
> |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}|
> When elements of the array are records, it's possible to update fields of the 
> record but not the record itself as-is.
> Also in the MultiArrayIndexPath implementation, index of array elements is 
> not correctly provided. Because of that, wrong elements of the array could be 
> updated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4383) UpdateRecord - cannot update arrays elements

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311908#comment-16311908
 ] 

ASF GitHub Bot commented on NIFI-4383:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2208
  
@pvillard31 all looks good to me. Thanks for catching this and updating it. 
Sorry it took so long to get merged, but it's in master now!


> UpdateRecord - cannot update arrays elements
> 
>
> Key: NIFI-4383
> URL: https://issues.apache.org/jira/browse/NIFI-4383
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Pierre Villard
>Assignee: Pierre Villard
>  Labels: records
>
> At the moment, if trying to use the update record to update the elements of 
> an array it won't have any effect.
> Input:
> {noformat}
> {
>   "numbers" : [ 1, null, 4 ]
> }
> {noformat}
> Parameters:
> ||Path||Value||Expected output||
> |{{/numbers[*]}}|{{8}}|{{"numbers" : [ 8, 8, 8 ]}}|
> |{{/numbers[1]}}|{{8}}|{{"numbers" : [ 1, 8, 4 ]}}|
> |{{/numbers[0..1]}}|{{8}}|{{"numbers" : [ 8, 8, 4 ]}}|
> |{{/numbers[0,2]}}|{{8}}|{{"numbers" : [ 8, null, 8 ]}}|
> When elements of the array are records, it's possible to update fields of the 
> record but not the record itself as-is.
> Also in the MultiArrayIndexPath implementation, index of array elements is 
> not correctly provided. Because of that, wrong elements of the array could be 
> updated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2208: NIFI-4383 - Fix UpdateRecord when updating arrays e...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2208


---


[jira] [Assigned] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread Matt Burgess (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Burgess reassigned NIFI-4738:
--

Assignee: Matt Burgess

> ExecuteSQL error when table uses 9-digit unsigned int
> -
>
> Key: NIFI-4738
> URL: https://issues.apache.org/jira/browse/NIFI-4738
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>
> There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
> integers to be treated as ints when the DB (at least MySQL) will return a 
> Long object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is 
> because the logic bug is not exposed when the MEDIUMINT is signed (as it 
> satisfies the other condition of the if clause). But if a field is declared 
> as INT(9) UNSIGNED, the logic will see if the precision is 9 digits or less, 
> and treat it as an Integer if so. But an unsigned value with 9 digits will 
> fit in a Long, and should be treated as such in the schema.
> Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
> MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (NIFI-4738) ExecuteSQL error when table uses 9-digit unsigned int

2018-01-04 Thread Matt Burgess (JIRA)
Matt Burgess created NIFI-4738:
--

 Summary: ExecuteSQL error when table uses 9-digit unsigned int
 Key: NIFI-4738
 URL: https://issues.apache.org/jira/browse/NIFI-4738
 Project: Apache NiFi
  Issue Type: Bug
  Components: Extensions
Reporter: Matt Burgess


There is a logic bug in the fix for NIFI-3076 that causes unsigned 9-digit 
integers to be treated as ints when the DB (at least MySQL) will return a Long 
object. Although the fix for NIFI-3076 does work for MEDIUMINT, this is because 
the logic bug is not exposed when the MEDIUMINT is signed (as it satisfies the 
other condition of the if clause). But if a field is declared as INT(9) 
UNSIGNED, the logic will see if the precision is 9 digits or less, and treat it 
as an Integer if so. But an unsigned value with 9 digits will fit in a Long, 
and should be treated as such in the schema.

Recommended fix is to replace the "<= MAX_DIGITS_PER_INT" with "< 
MAX_DIGITS_PER_INT".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4595) Add ConsumeAzureEventHub processor

2018-01-04 Thread Joseph Witt (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph Witt updated NIFI-4595:
--
   Resolution: Fixed
Fix Version/s: 1.5.0
   Status: Resolved  (was: Patch Available)

updated L a little, updated dep version and corrected small rebase/class 
renaming that had happened since PR up.

+1 merged to master.

> Add ConsumeAzureEventHub processor
> --
>
> Key: NIFI-4595
> URL: https://issues.apache.org/jira/browse/NIFI-4595
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.5.0
>
>
> As reported by NIFI-2835, the existing GetAzureEventHub processor does not 
> handle partition distribution among nodes in a NiFi cluster, and handling 
> offset information across NiFi restart.
> The processor uses PartitionReceiver directly, however, there's another 
> approach to receive events from Event Hub, that is using EventProcessorHost 
> class.
> EventProcessorHost uses Azure Storage to store consumer group offset 
> information so that it can distribute consumer load among consumer instances 
> within the same group. Also this way can handle restart and NiFi cluster 
> scale scenario better.
> In addition to above enhancements, we could also add Record data model 
> capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord 
> processor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4595) Add ConsumeAzureEventHub processor

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311880#comment-16311880
 ] 

ASF GitHub Bot commented on NIFI-4595:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2264


> Add ConsumeAzureEventHub processor
> --
>
> Key: NIFI-4595
> URL: https://issues.apache.org/jira/browse/NIFI-4595
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> As reported by NIFI-2835, the existing GetAzureEventHub processor does not 
> handle partition distribution among nodes in a NiFi cluster, and handling 
> offset information across NiFi restart.
> The processor uses PartitionReceiver directly, however, there's another 
> approach to receive events from Event Hub, that is using EventProcessorHost 
> class.
> EventProcessorHost uses Azure Storage to store consumer group offset 
> information so that it can distribute consumer load among consumer instances 
> within the same group. Also this way can handle restart and NiFi cluster 
> scale scenario better.
> In addition to above enhancements, we could also add Record data model 
> capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord 
> processor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2264: NIFI-4595: Add ConsumeAzureEventHub.

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2264


---


[jira] [Commented] (NIFI-4595) Add ConsumeAzureEventHub processor

2018-01-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311879#comment-16311879
 ] 

ASF subversion and git services commented on NIFI-4595:
---

Commit 98af3dc4cd56da324bb4572aab2dc395338ed45d in nifi's branch 
refs/heads/master from [~ijokarumawak]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=98af3dc ]

NIFI-4595: This closes #2264. Add ConsumeAzureEventHub.

Signed-off-by: joewitt 


> Add ConsumeAzureEventHub processor
> --
>
> Key: NIFI-4595
> URL: https://issues.apache.org/jira/browse/NIFI-4595
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> As reported by NIFI-2835, the existing GetAzureEventHub processor does not 
> handle partition distribution among nodes in a NiFi cluster, and handling 
> offset information across NiFi restart.
> The processor uses PartitionReceiver directly, however, there's another 
> approach to receive events from Event Hub, that is using EventProcessorHost 
> class.
> EventProcessorHost uses Azure Storage to store consumer group offset 
> information so that it can distribute consumer load among consumer instances 
> within the same group. Also this way can handle restart and NiFi cluster 
> scale scenario better.
> In addition to above enhancements, we could also add Record data model 
> capability to new ConsumeAzureEventHub. Similar to ConsumeKafkaRecord 
> processor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4733) Issue with variable registry two phase commit

2018-01-04 Thread Matt Gilman (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Gilman updated NIFI-4733:
--
   Resolution: Fixed
Fix Version/s: 1.5.0
   Status: Resolved  (was: Patch Available)

> Issue with variable registry two phase commit
> -
>
> Key: NIFI-4733
> URL: https://issues.apache.org/jira/browse/NIFI-4733
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.4.0
>Reporter: Matt Gilman
>Assignee: Matt Gilman
> Fix For: 1.5.0
>
>
> Need to address the logic issue during the two-phase commit exchange when 
> running clustered and invoking a variable registry update request. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit

2018-01-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311763#comment-16311763
 ] 

ASF subversion and git services commented on NIFI-4733:
---

Commit 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e in nifi's branch 
refs/heads/master from [~mcgilman]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=7a8dbb8 ]

NIFI-4733:
- Resolving logic issue in two phase commit when updating variable registry. 
This closes #2370


> Issue with variable registry two phase commit
> -
>
> Key: NIFI-4733
> URL: https://issues.apache.org/jira/browse/NIFI-4733
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.4.0
>Reporter: Matt Gilman
>Assignee: Matt Gilman
>
> Need to address the logic issue during the two-phase commit exchange when 
> running clustered and invoking a variable registry update request. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2370: NIFI-4733: Addressing two phase commit logic issue ...

2018-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2370


---


[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311765#comment-16311765
 ] 

ASF GitHub Bot commented on NIFI-4733:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2370


> Issue with variable registry two phase commit
> -
>
> Key: NIFI-4733
> URL: https://issues.apache.org/jira/browse/NIFI-4733
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.4.0
>Reporter: Matt Gilman
>Assignee: Matt Gilman
>
> Need to address the logic issue during the two-phase commit exchange when 
> running clustered and invoking a variable registry update request. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4733) Issue with variable registry two phase commit

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311761#comment-16311761
 ] 

ASF GitHub Bot commented on NIFI-4733:
--

Github user mcgilman commented on the issue:

https://github.com/apache/nifi/pull/2370
  
Thanks @alopresto! I've addressed the comments here and I will merge to 
master.


> Issue with variable registry two phase commit
> -
>
> Key: NIFI-4733
> URL: https://issues.apache.org/jira/browse/NIFI-4733
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Affects Versions: 1.4.0
>Reporter: Matt Gilman
>Assignee: Matt Gilman
>
> Need to address the logic issue during the two-phase commit exchange when 
> running clustered and invoking a variable registry update request. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >