[2/2] nifi git commit: NIFI-3933: - When monitoring heartbeats use the connected nodes as the basis for the check. This addresses the case when a node is terminated and no corresponding heartbeats exi

2017-05-22 Thread markap14
NIFI-3933:
- When monitoring heartbeats use the connected nodes as the basis for the 
check. This addresses the case when a node is terminated and no corresponding 
heartbeats exist.

This closes #1838.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ded396f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ded396f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ded396f0

Branch: refs/heads/master
Commit: ded396f0ef2c090456a532f2ce427525c8a1a4b6
Parents: d33c4c7
Author: Matt Gilman 
Authored: Mon May 22 15:28:30 2017 -0400
Committer: Mark Payne 
Committed: Mon May 22 16:51:13 2017 -0400

--
 .../nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java | 7 ---
 .../coordination/heartbeat/AbstractHeartbeatMonitor.java  | 7 +++
 2 files changed, 7 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/ded396f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
index 3cc5fd0..6a0937d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -59,13 +59,6 @@ public interface HeartbeatMonitor {
 void purgeHeartbeats();
 
 /**
- * Returns when the heartbeats were purged last.
- *
- * @return when the heartbeats were purged last
- */
-long getPurgeTimestamp();
-
-/**
  * @return the address that heartbeats should be sent to when this node is 
elected coordinator.
  */
 String getHeartbeatAddress();

http://git-wip-us.apache.org/repos/asf/nifi/blob/ded396f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index c5d9e4b..4c251f9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -282,6 +282,13 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
 protected abstract Map 
getLatestHeartbeats();
 
 /**
+ * Returns when the heartbeats were purged last.
+ *
+ * @return when the heartbeats were purged last
+ */
+protected abstract long getPurgeTimestamp();
+
+/**
  * This method does nothing in the abstract class but is meant for
  * subclasses to override in order to provide functionality when the 
monitor
  * is started.



[1/2] nifi git commit: NIFI-3933: - When monitoring heartbeats use the connected nodes as the basis for the check. This addresses the case when a node is terminated and no corresponding heartbeats exi

2017-05-22 Thread markap14
Repository: nifi
Updated Branches:
  refs/heads/master a1b07b1e9 -> ded396f0e


NIFI-3933:
- When monitoring heartbeats use the connected nodes as the basis for the 
check. This addresses the case when a node is terminated and no corresponding 
heartbeats exist.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d33c4c72
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d33c4c72
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d33c4c72

Branch: refs/heads/master
Commit: d33c4c72d49979ab04db489e429dd202d51585b1
Parents: a1b07b1
Author: Matt Gilman 
Authored: Mon May 22 15:28:30 2017 -0400
Committer: Mark Payne 
Committed: Mon May 22 16:50:30 2017 -0400

--
 .../heartbeat/HeartbeatMonitor.java |  7 ++
 .../heartbeat/AbstractHeartbeatMonitor.java | 41 ---
 .../ClusterProtocolHeartbeatMonitor.java| 37 ++
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 75 +++-
 4 files changed, 119 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
index 6a0937d..3cc5fd0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -59,6 +59,13 @@ public interface HeartbeatMonitor {
 void purgeHeartbeats();
 
 /**
+ * Returns when the heartbeats were purged last.
+ *
+ * @return when the heartbeats were purged last
+ */
+long getPurgeTimestamp();
+
+/**
  * @return the address that heartbeats should be sent to when this node is 
elected coordinator.
  */
 String getHeartbeatAddress();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 5119dac..c5d9e4b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -154,19 +154,38 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
 
 // Disconnect any node that hasn't sent a heartbeat in a long time (8 
times the heartbeat interval)
 final long maxMillis = heartbeatIntervalMillis * 8;
-final long threshold = System.currentTimeMillis() - maxMillis;
-for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
-if (heartbeat.getTimestamp() < threshold) {
-final long secondsSinceLastHeartbeat = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
heartbeat.getTimestamp());
+final long currentTimestamp = System.currentTimeMillis();
+final long threshold = currentTimestamp - maxMillis;
 
-
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), 
DisconnectionCode.LACK_OF_HEARTBEAT,
-"Have not received a heartbeat from node in " + 
secondsSinceLastHeartbeat + " seconds");
+// consider all connected nodes
+for (final NodeIdentifier nodeIdentifier : 
clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED)) {
+final NodeHeartbeat heartbeat = 
latestHeartbeats.get(nodeIdentifier);
 
-try 

nifi git commit: NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when determining the schema

2017-05-22 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master 6937a6cf6 -> a1b07b1e9


NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when 
determining the schema

This closes #1839.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1b07b1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1b07b1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1b07b1e

Branch: refs/heads/master
Commit: a1b07b1e9c388d8642699b1d8b101a606dc5bd6a
Parents: 6937a6c
Author: Mark Payne 
Authored: Mon May 22 16:04:34 2017 -0400
Committer: Bryan Bende 
Committed: Mon May 22 16:30:47 2017 -0400

--
 .../java/org/apache/nifi/grok/GrokReader.java   | 45 ++--
 .../apache/nifi/grok/TestGrokRecordReader.java  | 40 +
 2 files changed, 73 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1b07b1e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index a874632..dcf8b5a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -133,35 +133,56 @@ public class GrokReader extends SchemaRegistryService 
implements RecordReaderFac
 
 appendUnmatchedLine = 
context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
 
-this.recordSchema = createRecordSchema(grok);
+final String schemaAccess = 
context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+if 
(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
+this.recordSchema = createRecordSchema(grok);
+} else {
+this.recordSchema = null;
+}
 }
 
 static RecordSchema createRecordSchema(final Grok grok) {
 final List fields = new ArrayList<>();
 
 String grokExpression = grok.getOriginalGrokPattern();
+populateSchemaFieldNames(grok, grokExpression, fields);
+
+fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, 
RecordFieldType.STRING.getDataType()));
+
+final RecordSchema schema = new SimpleRecordSchema(fields);
+return schema;
+}
+
+private static void populateSchemaFieldNames(final Grok grok, String 
grokExpression, final List fields) {
 while (grokExpression.length() > 0) {
 final Matcher matcher = 
GrokUtils.GROK_PATTERN.matcher(grokExpression);
 if (matcher.find()) {
-final Map namedGroups = 
GrokUtils.namedGroups(matcher, grokExpression);
-final String fieldName = namedGroups.get("subname");
+final Map extractedGroups = 
GrokUtils.namedGroups(matcher, grokExpression);
+final String subName = extractedGroups.get("subname");
 
-DataType dataType = RecordFieldType.STRING.getDataType();
-final RecordField recordField = new RecordField(fieldName, 
dataType);
-fields.add(recordField);
+if (subName == null) {
+final String subPatternName = 
extractedGroups.get("pattern");
+if (subPatternName == null) {
+continue;
+}
+
+final String subExpression = 
grok.getPatterns().get(subPatternName);
+populateSchemaFieldNames(grok, subExpression, fields);
+} else {
+DataType dataType = RecordFieldType.STRING.getDataType();
+final RecordField recordField = new RecordField(subName, 
dataType);
+fields.add(recordField);
+}
 
 if (grokExpression.length() > matcher.end() + 1) {
-grokExpression = grokExpression.substring(matcher.end() + 
1);
+grokExpression = grokExpression.substring(matcher.end());
 } else {
 

[jira] [Commented] (MINIFI-207) findProcessor in ProcessGroup not thread safe

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020127#comment-16020127
 ] 

ASF GitHub Bot commented on MINIFI-207:
---

GitHub user phrocker opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/102

MINIFI-207: Use recursive mutex that avoid thread safety concerns

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/phrocker/nifi-minifi-cpp MINIFI-207

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #102






> findProcessor in ProcessGroup not thread safe
> -
>
> Key: MINIFI-207
> URL: https://issues.apache.org/jira/browse/MINIFI-207
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.1.0
>Reporter: marco polo
>Assignee: marco polo
>
> These functions do not use the mutex. the sets can be change the underlying 
> tree upon insertion. Since flow cannot be guaranteed we should use the mutex 
> here ( i.e. it can be called in any order since it is public ). 
> add and remove connection call this function and lock the mutex themselves. 
> An alternative might be to use a recursive mutex or dramatically reduce the 
> critical section in add/remove connection. This appears more desirable. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (MINIFI-320) In cases where users run TLSSocket, the context will cause a realloc in multiple threads.

2017-05-22 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo resolved MINIFI-320.
---
Resolution: Fixed

> In cases where users run TLSSocket, the context will cause a realloc in 
> multiple threads. 
> --
>
> Key: MINIFI-320
> URL: https://issues.apache.org/jira/browse/MINIFI-320
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> An example stacktrace. . We can either move *back* to a singleton of simply 
> move the ssl library initialization to a better place. 
> #0 0x10122d520 in wrap_realloc 
> (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x56520)
> #1 0x100fe80ae in CRYPTO_realloc (libcrypto.1.0.0.dylib:x86_64+0x20ae)
> #2 0x10109880c in lh_insert (libcrypto.1.0.0.dylib:x86_64+0xb280c)
> #3 0x100fea6e7 in OBJ_NAME_add (libcrypto.1.0.0.dylib:x86_64+0x46e7)
> #4 0x1010a5b41 in OpenSSL_add_all_ciphers 
> (libcrypto.1.0.0.dylib:x86_64+0xbfb41)
> #5 0x1010a58d1 in OPENSSL_add_all_algorithms_noconf 
> (libcrypto.1.0.0.dylib:x86_64+0xbf8d1)
> #6 0x10012ad40 in org::apache::nifi::minifi::io::TLSContext::initialize() 
> TLSSocket.cpp:60
> #7 0x100132255 in org::apache::nifi::minifi::io::TLSSocket::initialize() 
> TLSSocket.cpp:166
> #8 0x100677a68 in org::apache::nifi::minifi::Site2SitePeer::Open() 
> Site2SitePeer.cpp:42
> #9 0x100631543 in 
> org::apache::nifi::minifi::Site2SiteClientProtocol::establish() 
> Site2SiteClientProtocol.cpp:46
> #10 0x10063e9ff in 
> org::apache::nifi::minifi::Site2SiteClientProtocol::bootstrap() 
> Site2SiteClientProtocol.cpp:483
> #11 0x100600880 in 
> org::apache::nifi::minifi::RemoteProcessorGroupPort::onTrigger(org::apache::nifi::minifi::core::ProcessContext*,
>  org::apache::nifi::minifi::core::ProcessSession*) 
> RemoteProcessorGroupPort.cpp:148
> #12 0x1002c1a3a in 
> org::apache::nifi::minifi::core::Processor::onTrigger(org::apache::nifi::minifi::core::ProcessContext*,
>  org::apache::nifi::minifi::core::ProcessSessionFactory*) Processor.cpp:235
> #13 0x10062cc20 in 
> org::apache::nifi::minifi::SchedulingAgent::onTrigger(std::__1::shared_ptr,
>  org::apache::nifi::minifi::core::ProcessContext*, 
> org::apache::nifi::minifi::core::ProcessSessionFactory*) 
> SchedulingAgent.cpp:66
> #14 0x10069b933 in 
> org::apache::nifi::minifi::TimerDrivenSchedulingAgent::run(std::__1::shared_ptr,
>  org::apache::nifi::minifi::core::ProcessContext*, 
> org::apache::nifi::minifi::core::ProcessSessionFactory*) 
> TimerDrivenSchedulingAgent.cpp:37
> #15 0x100690205 in 
> org::apache::nifi::minifi::ThreadedSchedulingAgent::schedule(std::__1::shared_ptr)::$_0::operator()()
>  const ThreadedSchedulingAgent.cpp:97
> #16 0x10068f37d in void* 
> std::__1::__thread_proxy
>  >(void*) __functional_base:416
> #17 0x7fffb45cf9ae in _pthread_body 
> (libsystem_pthread.dylib:x86_64+0x39ae)
> #18 0x7fffb45cf8fa in _pthread_start 
> (libsystem_pthread.dylib:x86_64+0x38fa)
> #19 0x7fffb45cf100 in thread_start (libsystem_pthread.dylib:x86_64+0x3100)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (MINIFI-320) In cases where users run TLSSocket, the context will cause a realloc in multiple threads.

2017-05-22 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo reassigned MINIFI-320:
-

Assignee: marco polo

> In cases where users run TLSSocket, the context will cause a realloc in 
> multiple threads. 
> --
>
> Key: MINIFI-320
> URL: https://issues.apache.org/jira/browse/MINIFI-320
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> An example stacktrace. . We can either move *back* to a singleton of simply 
> move the ssl library initialization to a better place. 
> #0 0x10122d520 in wrap_realloc 
> (libclang_rt.asan_osx_dynamic.dylib:x86_64h+0x56520)
> #1 0x100fe80ae in CRYPTO_realloc (libcrypto.1.0.0.dylib:x86_64+0x20ae)
> #2 0x10109880c in lh_insert (libcrypto.1.0.0.dylib:x86_64+0xb280c)
> #3 0x100fea6e7 in OBJ_NAME_add (libcrypto.1.0.0.dylib:x86_64+0x46e7)
> #4 0x1010a5b41 in OpenSSL_add_all_ciphers 
> (libcrypto.1.0.0.dylib:x86_64+0xbfb41)
> #5 0x1010a58d1 in OPENSSL_add_all_algorithms_noconf 
> (libcrypto.1.0.0.dylib:x86_64+0xbf8d1)
> #6 0x10012ad40 in org::apache::nifi::minifi::io::TLSContext::initialize() 
> TLSSocket.cpp:60
> #7 0x100132255 in org::apache::nifi::minifi::io::TLSSocket::initialize() 
> TLSSocket.cpp:166
> #8 0x100677a68 in org::apache::nifi::minifi::Site2SitePeer::Open() 
> Site2SitePeer.cpp:42
> #9 0x100631543 in 
> org::apache::nifi::minifi::Site2SiteClientProtocol::establish() 
> Site2SiteClientProtocol.cpp:46
> #10 0x10063e9ff in 
> org::apache::nifi::minifi::Site2SiteClientProtocol::bootstrap() 
> Site2SiteClientProtocol.cpp:483
> #11 0x100600880 in 
> org::apache::nifi::minifi::RemoteProcessorGroupPort::onTrigger(org::apache::nifi::minifi::core::ProcessContext*,
>  org::apache::nifi::minifi::core::ProcessSession*) 
> RemoteProcessorGroupPort.cpp:148
> #12 0x1002c1a3a in 
> org::apache::nifi::minifi::core::Processor::onTrigger(org::apache::nifi::minifi::core::ProcessContext*,
>  org::apache::nifi::minifi::core::ProcessSessionFactory*) Processor.cpp:235
> #13 0x10062cc20 in 
> org::apache::nifi::minifi::SchedulingAgent::onTrigger(std::__1::shared_ptr,
>  org::apache::nifi::minifi::core::ProcessContext*, 
> org::apache::nifi::minifi::core::ProcessSessionFactory*) 
> SchedulingAgent.cpp:66
> #14 0x10069b933 in 
> org::apache::nifi::minifi::TimerDrivenSchedulingAgent::run(std::__1::shared_ptr,
>  org::apache::nifi::minifi::core::ProcessContext*, 
> org::apache::nifi::minifi::core::ProcessSessionFactory*) 
> TimerDrivenSchedulingAgent.cpp:37
> #15 0x100690205 in 
> org::apache::nifi::minifi::ThreadedSchedulingAgent::schedule(std::__1::shared_ptr)::$_0::operator()()
>  const ThreadedSchedulingAgent.cpp:97
> #16 0x10068f37d in void* 
> std::__1::__thread_proxy
>  >(void*) __functional_base:416
> #17 0x7fffb45cf9ae in _pthread_body 
> (libsystem_pthread.dylib:x86_64+0x39ae)
> #18 0x7fffb45cf8fa in _pthread_start 
> (libsystem_pthread.dylib:x86_64+0x38fa)
> #19 0x7fffb45cf100 in thread_start (libsystem_pthread.dylib:x86_64+0x3100)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (MINIFI-321) Stub out properties with their defaults to minifi.properties

2017-05-22 Thread marco polo (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

marco polo reassigned MINIFI-321:
-

Assignee: marco polo

> Stub out properties with their defaults to minifi.properties
> 
>
> Key: MINIFI-321
> URL: https://issues.apache.org/jira/browse/MINIFI-321
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: Agent Configuration/Installation, C++
>Reporter: Aldrin Piri
>Assignee: marco polo
>
> MINIFI-37 provided some additional implementations for repositories.  It 
> would be helpful to have these stubbed out in minifi.properites with their 
> defaults as well as similar other properties that a user may wish to change.  
> For properties not yet exposed we should do so and provide the relevant 
> updates.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (MINIFI-37) Native Volatile Content Repository implementation

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-37?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020083#comment-16020083
 ] 

ASF GitHub Bot commented on MINIFI-37:
--

Github user phrocker commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/98
  
@apiri This should be good once travis deems it so. I'm adding some 
information on the wiki on configuring this and other portions of MiNiFi cpp


> Native Volatile Content Repository implementation
> -
>
> Key: MINIFI-37
> URL: https://issues.apache.org/jira/browse/MINIFI-37
> Project: Apache NiFi MiNiFi
>  Issue Type: Task
>  Components: C++, Core Framework
>Reporter: Aldrin Piri
>Assignee: marco polo
>Priority: Minor
>  Labels: native
>
> Given the constrained environments in which MiNiFi could operate, it would be 
> beneficial to provide a content repository that is strictly in memory for 
> those environments where disk storage may be limited or non-existent. 
> This implementation should consider configuration options around its 
> footprint such as number of entries held and/or sheer capacity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


nifi git commit: NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10 Also, updated record writers to ensure that they write the schema as app

2017-05-22 Thread joewitt
Repository: nifi
Updated Branches:
  refs/heads/master 6d16fdf17 -> 6937a6cf6


NIFI-3953: This closes #1837. Allow multiple schemas on same kafka 
topic/partition for ConsumeKafkaRecord_0_10
Also, updated record writers to ensure that they write the schema as 
appropriate if not using a RecordSet. Updated ConsumeKafkaRecord to allow for 
multiple schemas to be on same topic and partition

Signed-off-by: joewitt 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6937a6cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6937a6cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6937a6cf

Branch: refs/heads/master
Commit: 6937a6cf64c2f9e437b96550259575488eb284ec
Parents: 6d16fdf
Author: Mark Payne 
Authored: Mon May 22 12:02:18 2017 -0400
Committer: joewitt 
Committed: Mon May 22 14:37:53 2017 -0400

--
 .../serialization/AbstractRecordSetWriter.java  |   4 +-
 .../processors/kafka/pubsub/ConsumerLease.java  | 179 ++-
 .../processors/kafka/pubsub/PublisherLease.java |   1 +
 .../avro/WriteAvroResultWithExternalSchema.java |   8 +-
 .../org/apache/nifi/csv/WriteCSVResult.java |   6 +
 .../org/apache/nifi/json/WriteJsonResult.java   |   7 +
 6 files changed, 120 insertions(+), 85 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
--
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 6ce9138..4de5ce3 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements 
RecordSetWriter {
 return recordCount;
 }
 
-protected final boolean isRecordSetActive() {
+protected final boolean isActiveRecordSet() {
 return activeRecordSet;
 }
 
@@ -84,7 +84,7 @@ public abstract class AbstractRecordSetWriter implements 
RecordSetWriter {
 
 @Override
 public final WriteResult finishRecordSet() throws IOException {
-if (!isRecordSetActive()) {
+if (!isActiveRecordSet()) {
 throw new IllegalStateException("Cannot finish RecordSet because 
no RecordSet has begun");
 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
--
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 563ece6..effd2e4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -21,22 +21,18 @@ import static 
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.RE
 import static 
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
 import static 
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -57,11 +53,10 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import 

nifi git commit: NIFI-3952: Updated UpdateRecord to pass field-related variables to the Expression Language

2017-05-22 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master 1a3c525dd -> 6d16fdf17


NIFI-3952: Updated UpdateRecord to pass field-related variables to the 
Expression Language

This closes #1836.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6d16fdf1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6d16fdf1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6d16fdf1

Branch: refs/heads/master
Commit: 6d16fdf170db72cca41ef5c450f2b75ae2e74699
Parents: 1a3c525
Author: Mark Payne 
Authored: Mon May 22 10:51:31 2017 -0400
Committer: Bryan Bende 
Committed: Mon May 22 14:32:45 2017 -0400

--
 .../nifi-standard-processors/pom.xml|  1 +
 .../nifi/processors/standard/UpdateRecord.java  | 27 +++--
 .../additionalDetails.html  | 58 
 .../processors/standard/TestUpdateRecord.java   | 30 ++
 .../output/person-with-capital-lastname.json|  7 +++
 5 files changed, 119 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index a0932e0..d8021e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -457,6 +457,7 @@
 
src/test/resources/TestUpdateRecord/input/person.json
 
src/test/resources/TestUpdateRecord/output/person-with-firstname.json
 
src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json
+
src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json
 
src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc
 
src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc
 
src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc

http://git-wip-us.apache.org/repos/asf/nifi/blob/6d16fdf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 9151cde..6acc789 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -20,21 +20,24 @@ package org.apache.nifi.processors.standard;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
@@ -47,6 +50,7 @@ import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 
 @EventDriven
@@ -60,12 +64,16 @@ import 

[jira] [Resolved] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread Aldrin Piri (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aldrin Piri resolved MINIFI-325.

Resolution: Fixed

> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.2.0
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019809#comment-16019809
 ] 

ASF GitHub Bot commented on MINIFI-325:
---

Github user asfgit closed the pull request at:

https://github.com/apache/nifi-minifi-cpp/pull/101


> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.2.0
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread Aldrin Piri (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aldrin Piri updated MINIFI-325:
---
Affects Version/s: cpp-0.2.0

> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.2.0
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread Aldrin Piri (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aldrin Piri updated MINIFI-325:
---
Component/s: C++

> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.2.0
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread Aldrin Piri (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aldrin Piri updated MINIFI-325:
---
Fix Version/s: cpp-0.3.0

> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>  Components: C++
>Affects Versions: cpp-0.2.0
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
> Fix For: cpp-0.3.0
>
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019808#comment-16019808
 ] 

ASF subversion and git services commented on MINIFI-325:


Commit 9f98a407c348645ac234a910b19c8c468d24f230 in nifi-minifi-cpp's branch 
refs/heads/master from Marc Parisi
[ https://git-wip-us.apache.org/repos/asf?p=nifi-minifi-cpp.git;h=9f98a40 ]

MINIFI-325: Set stream_factory_ from FlowConfiguration constructor

This closes #101.

Signed-off-by: Aldrin Piri 


> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


nifi-minifi-cpp git commit: MINIFI-325: Set stream_factory_ from FlowConfiguration constructor

2017-05-22 Thread aldrin
Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 76f06675c -> 9f98a407c


MINIFI-325: Set stream_factory_ from FlowConfiguration constructor

This closes #101.

Signed-off-by: Aldrin Piri 


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9f98a407
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9f98a407
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9f98a407

Branch: refs/heads/master
Commit: 9f98a407c348645ac234a910b19c8c468d24f230
Parents: 76f0667
Author: Marc Parisi 
Authored: Sat May 20 16:41:25 2017 -0400
Committer: Aldrin Piri 
Committed: Mon May 22 12:48:12 2017 -0400

--
 libminifi/include/core/FlowConfiguration.h  |   3 +-
 .../integration/ProvenanceReportingTest.cpp | 115 +++
 .../test/resources/TestProvenanceReporting.yml  |  72 
 3 files changed, 189 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f98a407/libminifi/include/core/FlowConfiguration.h
--
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index 2e704b5..b918aac 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -65,7 +65,8 @@ class FlowConfiguration : public CoreComponent {
  const std::string path)
   : CoreComponent(core::getClassName()),
 flow_file_repo_(flow_file_repo),
-config_path_(path) {
+config_path_(path),
+stream_factory_(stream_factory) {
 controller_services_ = std::make_shared<
 core::controller::ControllerServiceMap>();
 service_provider_ = std::make_shared<

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f98a407/libminifi/test/integration/ProvenanceReportingTest.cpp
--
diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp 
b/libminifi/test/integration/ProvenanceReportingTest.cpp
new file mode 100644
index 000..ff5d563
--- /dev/null
+++ b/libminifi/test/integration/ProvenanceReportingTest.cpp
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/LogAppenders.h"
+#include "core/logging/BaseLogger.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+
+
+void waitToVerifyProcessor() {
+  std::this_thread::sleep_for(std::chrono::seconds(2));
+}
+
+int main(int argc, char **argv) {
+  std::string test_file_location;
+  if (argc > 1) {
+test_file_location = argv[1];
+  }
+  mkdir("/tmp/aljs39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+  mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+  std::ostringstream oss;
+  std::unique_ptr outputLogger = std::unique_ptr<
+  logging::BaseLogger>(
+  new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,
+ 0));
+  std::shared_ptr logger = logging::Logger::getLogger();
+  logger->updateLogger(std::move(outputLogger));
+  logger->setLogLevel("debug");
+
+  std::shared_ptr configuration = std::make_shared<
+  minifi::Configure>();
+
+  std::shared_ptr test_repo =
+  std::make_shared();
+  std::shared_ptr test_flow_repo = std::make_shared<
+  TestFlowRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file,
+ test_file_location);
+  std::shared_ptr stream_factory = std::make_shared<
+  

[jira] [Commented] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019804#comment-16019804
 ] 

ASF GitHub Bot commented on MINIFI-325:
---

Github user apiri commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/101
  
code and test looks good.  verified functionality. will get this 
incorporated, thanks!


> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


nifi git commit: NIFI-3951: Fixed bug that calculated the index incorrectly when filtering for ArrayIndexPath

2017-05-22 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master 7f8987471 -> 1a3c525dd


NIFI-3951: Fixed bug that calculated the index incorrectly when filtering for 
ArrayIndexPath

This closes #1835.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1a3c525d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1a3c525d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1a3c525d

Branch: refs/heads/master
Commit: 1a3c525dd68d7ad5403a23aaabda131edbf489fa
Parents: 7f89874
Author: Mark Payne 
Authored: Mon May 22 09:45:35 2017 -0400
Committer: Bryan Bende 
Committed: Mon May 22 12:39:32 2017 -0400

--
 .../org/apache/nifi/record/path/paths/ArrayIndexPath.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/1a3c525d/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
--
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
index 287ae2d..3e81868 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ArrayIndexPath.java
@@ -42,16 +42,20 @@ public class ArrayIndexPath extends RecordPathSegment {
 
 return parentResult
 .filter(Filters.fieldTypeFilter(RecordFieldType.ARRAY))
-.filter(fieldValue -> fieldValue.getValue() != null && ((Object[]) 
fieldValue.getValue()).length >= Math.abs(index) - 1)
+.filter(fieldValue -> fieldValue.getValue() != null && ((Object[]) 
fieldValue.getValue()).length > getArrayIndex(((Object[]) 
fieldValue.getValue()).length))
 .map(fieldValue -> {
 final ArrayDataType arrayDataType = (ArrayDataType) 
fieldValue.getField().getDataType();
 final DataType elementDataType = 
arrayDataType.getElementType();
 final RecordField arrayField = new 
RecordField(fieldValue.getField().getFieldName(), elementDataType);
 final Object[] values = (Object[]) fieldValue.getValue();
-final int arrayIndex = index < 0 ? values.length + index : 
index;
+final int arrayIndex = getArrayIndex(values.length);
 final RecordField elementField = new 
RecordField(arrayField.getFieldName() + "[" + arrayIndex + "]", 
elementDataType);
 final FieldValue result = new 
ArrayIndexFieldValue(values[arrayIndex], elementField, fieldValue, arrayIndex);
 return result;
 });
 }
+
+private int getArrayIndex(final int arrayLength) {
+return index < 0 ? arrayLength + index : index;
+}
 }



[jira] [Commented] (MINIFI-325) FlowConfiguration does not appropriately propagate stream factory

2017-05-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019752#comment-16019752
 ] 

ASF GitHub Bot commented on MINIFI-325:
---

Github user apiri commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/101
  
reviewing


> FlowConfiguration does not appropriately propagate stream factory
> -
>
> Key: MINIFI-325
> URL: https://issues.apache.org/jira/browse/MINIFI-325
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>Reporter: marco polo
>Assignee: marco polo
>Priority: Blocker
>
> FlowConfiguration does not appropriately propagate stream factory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


nifi git commit: NIFI-3946: Update LookupService to take a Map instead of a String for the input NIFI-3946: Fixed issues where null values were returned instead of empty optionals

2017-05-22 Thread bbende
Repository: nifi
Updated Branches:
  refs/heads/master 71cd497fe -> 7f8987471


NIFI-3946: Update LookupService to take a Map instead of a String for the input
NIFI-3946: Fixed issues where null values were returned instead of empty 
optionals

This closes #1833.

Signed-off-by: Bryan Bende 


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f898747
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f898747
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f898747

Branch: refs/heads/master
Commit: 7f8987471d18deececd71bed0cbdee28b9c6254f
Parents: 71cd497
Author: Mark Payne 
Authored: Fri May 19 16:42:51 2017 -0400
Committer: Bryan Bende 
Committed: Mon May 22 11:52:41 2017 -0400

--
 .../lookup/script/ScriptedLookupService.java|  14 +-
 .../script/TestScriptedLookupService.groovy |   6 +-
 .../resources/groovy/test_lookup_inline.groovy  |  11 +-
 .../nifi/processors/standard/LookupRecord.java  | 145 ++-
 .../processors/standard/TestLookupRecord.java   |  38 -
 .../org/apache/nifi/lookup/LookupService.java   |  23 ++-
 .../apache/nifi/lookup/RecordLookupService.java |  11 +-
 .../apache/nifi/lookup/StringLookupService.java |   9 +-
 .../lookup/SimpleKeyValueLookupService.java |  23 ++-
 .../nifi/lookup/maxmind/IPLookupService.java|  85 ++-
 10 files changed, 265 insertions(+), 100 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
--
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
index da846ec..ca79ba9 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
@@ -46,7 +46,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -64,9 +66,14 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
 private volatile File kerberosServiceKeytab = null;
 
 @Override
-public Optional lookup(String key) throws LookupFailureException {
+public Optional lookup(Map coordinates) throws 
LookupFailureException {
 // Delegate the lookup() call to the scripted LookupService
-return lookupService.get().lookup(key);
+return lookupService.get().lookup(coordinates);
+}
+
+@Override
+public Set getRequiredKeys() {
+return lookupService.get().getRequiredKeys();
 }
 
 @Override
@@ -177,6 +184,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
 }
 }
 
+@Override
 @OnEnabled
 public void onEnabled(final ConfigurationContext context) {
 synchronized (scriptingComponentHelper.isInitialized) {
@@ -236,6 +244,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
 }
 }
 
+@Override
 public void setup() {
 // Create a single script engine, the Processor object is reused by 
each task
 if (scriptEngine == null) {
@@ -263,6 +272,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
  * @param scriptBody An input stream associated with the script content
  * @return Whether the script was successfully reloaded
  */
+@Override
 protected boolean reloadScript(final String scriptBody) {
 // note we are starting here with a fresh listing of validation
 // results since we are (re)loading a new/updated script. any

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
--
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
 

[1/2] nifi git commit: NIFI-3942 Making IPLookupService reload the database file on the fly when detecting the file has changed

2017-05-22 Thread markap14
Repository: nifi
Updated Branches:
  refs/heads/master c49933f03 -> 71cd497fe


NIFI-3942 Making IPLookupService reload the database file on the fly when 
detecting the file has changed


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f35e0ecd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f35e0ecd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f35e0ecd

Branch: refs/heads/master
Commit: f35e0ecdd0e0fd8940b1caa688405d2128f8eb61
Parents: c49933f
Author: Bryan Bende 
Authored: Fri May 19 15:06:48 2017 -0400
Committer: Mark Payne 
Committed: Mon May 22 09:06:42 2017 -0400

--
 .../nifi/lookup/maxmind/IPLookupService.java| 135 +++
 1 file changed, 109 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/f35e0ecd/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
index 1ac6b36..88c611e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
@@ -17,16 +17,16 @@
 
 package org.apache.nifi.lookup.maxmind;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+import com.maxmind.geoip2.record.Country;
+import com.maxmind.geoip2.record.Location;
+import com.maxmind.geoip2.record.Subdivision;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -41,15 +41,19 @@ import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 
-import com.maxmind.geoip2.model.AnonymousIpResponse;
-import com.maxmind.geoip2.model.CityResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
-import com.maxmind.geoip2.model.DomainResponse;
-import com.maxmind.geoip2.model.IspResponse;
-import com.maxmind.geoip2.record.Country;
-import com.maxmind.geoip2.record.Location;
-import com.maxmind.geoip2.record.Subdivision;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", 
"cellular", "anonymous", "tor"})
 @CapabilityDescription("A lookup service that provides several types of 
enrichment information for IP addresses. The service is configured by providing 
a MaxMind "
@@ -57,7 +61,15 @@ import com.maxmind.geoip2.record.Subdivision;
 + "service to provide all of the available enrichment data may be slower 
than returning only a portion of the available enrichments. View the Usage of 
this component "
 + "and choose to view Additional Details for more information, such as the 
Schema that pertains to the information that is returned.")
 public class IPLookupService extends AbstractControllerService implements 
RecordLookupService {
+
+private volatile String databaseFile = null;
 private volatile DatabaseReader databaseReader = null;
+private volatile String databaseChecksum = null;
+private volatile long databaseLastRefreshAttempt = -1;
+
+private final Lock 

[2/2] nifi git commit: NIFI-3942 Added retry logic if a lookup fails due to InvalidDatabaseException which occurs if the underlying file was modified before we could refresh the reader

2017-05-22 Thread markap14
NIFI-3942 Added retry logic if a lookup fails due to InvalidDatabaseException 
which occurs if the underlying file was modified before we could refresh the 
reader

This closes #1831.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/71cd497f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/71cd497f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/71cd497f

Branch: refs/heads/master
Commit: 71cd497fefd9b173f6b017f69a31fc7d57e906c7
Parents: f35e0ec
Author: Bryan Bende 
Authored: Fri May 19 17:58:52 2017 -0400
Committer: Mark Payne 
Committed: Mon May 22 09:12:51 2017 -0400

--
 .../nifi/lookup/maxmind/IPLookupService.java| 50 ++--
 1 file changed, 46 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/71cd497f/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
--
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
index 88c611e..58ee4de 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.lookup.maxmind;
 
+import com.maxmind.db.InvalidDatabaseException;
 import com.maxmind.geoip2.model.AnonymousIpResponse;
 import com.maxmind.geoip2.model.CityResponse;
 import com.maxmind.geoip2.model.ConnectionTypeResponse;
@@ -188,16 +189,47 @@ public class IPLookupService extends 
AbstractControllerService implements Record
 }
 }
 
-// assign to a local so we don't need a read lock, this way another 
thread can update the member variable reference
-// while the current thread continues using the local reference
-final DatabaseReader databaseReader = this.databaseReader;
+// If an external process changes the underlying file before we have a 
chance to reload the reader, then we'll get an
+// InvalidDatabaseException, so force a reload and then retry the 
lookup one time, if we still get an error then throw it
+try {
+final DatabaseReader databaseReader = this.databaseReader;
+return doLookup(databaseReader, key);
+} catch (InvalidDatabaseException idbe) {
+if (dbWriteLock.tryLock()) {
+try {
+getLogger().debug("Attempting to reload database after 
InvalidDatabaseException");
+try {
+final File dbFile = new File(databaseFile);
+final String dbFileChecksum = getChecksum(dbFile);
+loadDatabase(dbFile, dbFileChecksum);
+databaseLastRefreshAttempt = 
System.currentTimeMillis();
+} catch (IOException ioe) {
+throw new LookupFailureException("Error reloading 
database due to: " + ioe.getMessage(), ioe);
+}
+
+getLogger().debug("Attempting to retry lookup after 
InvalidDatabaseException");
+try {
+final DatabaseReader databaseReader = 
this.databaseReader;
+return doLookup(databaseReader, key);
+} catch (final Exception e) {
+throw new LookupFailureException("Error performing 
look up: " + e.getMessage(), e);
+}
+} finally {
+dbWriteLock.unlock();
+}
+} else {
+throw new LookupFailureException("Failed to lookup the key " + 
key + " due to " + idbe.getMessage(), idbe);
+}
+}
+}
 
+private Optional doLookup(final DatabaseReader databaseReader, 
final String key) throws LookupFailureException, InvalidDatabaseException {
 final InetAddress inetAddress;
 try {
 inetAddress = InetAddress.getByName(key);
 } catch (final IOException ioe) {
 getLogger().warn("Could not resolve the IP for value '{}'. This is 
usually caused by issue resolving the appropriate DNS record or " +
-"providing the service with an invalid IP address", new