Processor Question

2016-06-03 Thread Leslie Hartman

  
  
Hello:

    I would like to take a file and remove all but the
first 10 lines of or the first 256 bytes. Is there a way
to do this with one of the existing processors?

    Second question

    Is there a processor that would remove say everything from
unambiguously to the end of the file.

    Say every file of type x that you were processing had the word
unambiguously in it and you only
wanted the content that came before that word and nothing following
it, how would one do that.

Example: Input

HTML5 defines a  menu, which is to be used
to contain the primary navigation of a web site, be it a list of
links or a form element such as a search box. This is a good idea,
as previous to this we would contain the navigation block inside
something like . Yes, you
can identify this for styling purposes pretty well, but it is a ,
and therefore semantically anonymous. 
gives us a consistent way to unambiguously define with the primary
navigation is, which is good for things like search engine
optimization, and for visually


Example: Output

HTML5 defines a  menu, which is to be used
to contain the primary navigation of a web site, be it a list of
links or a form element such as a search box. This is a good idea,
as previous to this we would contain the navigation block inside
something like . Yes, you
can identify this for styling purposes pretty well, but it is a ,
and therefore semantically anonymous. 
gives us a consistent way to 

    Thank You.
  



RE: Custom processor is failing for concurrency

2016-06-03 Thread Kumiko Yada
Here is the callstacks with the latest changes.

2016-06-03 14:32:32,766 WARN [Timer-Driven Process Thread-4] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to uncaught 
Exception: com.microsoft.rest.ServiceException: Invalid status code 403
2016-06-03 14:32:32,768 WARN [Timer-Driven Process Thread-4] 
o.a.n.c.t.ContinuallyRunProcessorTask
com.microsoft.rest.ServiceException: Invalid status code 403
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[na:1.8.0_77]
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[na:1.8.0_77]
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[na:1.8.0_77]
at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_77]
at 
com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
 ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
 ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:217)
 ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_77]
at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_77]

Here is the library codes:

https://github.com/Azure/autorest-clientruntime-for-java/blob/master/client-runtime/src/main/java/com/microsoft/rest/AutoRestBaseUrl.java
https://github.com/Azure
https://github.com/Azure/azure-sdk-for-java

Thanks
Kumiko
From: Oleg Zhurakousky [mailto:ozhurakou...@hortonworks.com]
Sent: Friday, June 3, 2016 2:10 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for concurrency

Kumiko

It appears that the current state of the source you linked in is not in sync 
with what is in the stack trace. Perhaps you have made some code modifications 
(e.g., line 218 is an empty line in code while it has a pointer in the star 
trace).
In any event, from what I can see the error is coming from Azure libraries (not 
NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be 
doing some iteration where I presume the remove is called. Perhaps it is not a 
thread safe class after all. What does Microsoft documentation says? Have you 
looked at the source to see what’s going on there? If its open please link and 
we can tale a look.

Cheers
Oleg

On Jun 3, 2016, at 4:58 PM, Kumiko Yada 
mailto:kumiko.y...@ds-iq.com>> wrote:

Here is the code, https://github.com/kyada1/PutFileAzureDLStore.

Thanks
Kumiko

From: Bryan Bende [mailto:bbe...@gmail.com]
Sent: Friday, June 3, 2016 12:57 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for the custom processor

Hello,

Would you be able to share your code for PutFileAzureDLStore so we can help 
identify if there is a concurrency problem?

-Bryan

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada 
mailto:kumiko.y...@ds-iq.com>> wrote:
Hello,

I wrote the following custom service control and processor.  When the custom 
processor

Re: Custom processor is failing for concurrency

2016-06-03 Thread Bryan Bende
It is hard to say for sure, but I think your NiFi processor is generally ok
regarding thread safety, but I think there could be a problem in the Azure
SDK code...

RequestFactory has an instance of BaseUrl and every time
RequestFactory.create() is called, it calls BaseUrl.url().

The implementation of BaseUrl is the following (according to my IntelliJ
attaching the sources...):

public class AutoRestBaseUrl implements BaseUrl {
/** A template based URL with variables wrapped in {}s. */
private String template;
/** a mapping from {} wrapped variables in the template and their actual
values. */
private Map mappings;

@Override
public HttpUrl url() {
String url = template;
for (Map.Entry entry : mappings.entrySet()) {
url = url.replace(entry.getKey(), entry.getValue());
}
mappings.clear();
return HttpUrl.parse(url);
}

/**
* Creates an instance of a template based URL.
*
* @param url the template based URL to use.
*/
public AutoRestBaseUrl(String url) {
this.template = url;
this.mappings = new HashMap<>();
}

/**
* Sets the value for the {} wrapped variables in the template URL.
* @param matcher the {} wrapped variable to replace.
* @param value the value to set for the variable.
*/
public void set(CharSequence matcher, String value) {
this.mappings.put(matcher, value);
}
}

The exception is coming from the line where it is looping over the entryset:

for (Map.Entry entry : mappings.entrySet()) {

Right after that loop it calls mappings.clear() so if the RequestFactory is
shared by multiple threads (which I think it is), then one thread could be
iterating over the set, which another calls mappings.clear().


On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> Kumiko
>
> It appears that the current state of the source you linked in is not in
> sync with what is in the stack trace. Perhaps you have made some code
> modifications (e.g., line 218 is an empty line in code while it has a
> pointer in the star trace).
> In any event, from what I can see the error is coming from Azure libraries
> (not NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’
> seems to be doing some iteration where I presume the remove is called.
> Perhaps it is not a thread safe class after all. What does Microsoft
> documentation says? Have you looked at the source to see what’s going on
> there? If its open please link and we can tale a look.
>
> Cheers
> Oleg
>
> On Jun 3, 2016, at 4:58 PM, Kumiko Yada  wrote:
>
> Here is the code, https://github.com/kyada1/PutFileAzureDLStore.
>
> Thanks
> Kumiko
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com ]
> *Sent:* Friday, June 3, 2016 12:57 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: Custom processor is failing for the custom processor
>
> Hello,
>
> Would you be able to share your code for PutFileAzureDLStore so we can
> help identify if there is a concurrency problem?
>
> -Bryan
>
> On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada  wrote:
>
> Hello,
>
> I wrote the following custom service control and processor.  When the
> custom processor is running concurrently, it’s failing often with several
> different errors.  Are there any special handlings for concurrently that I
> need to add in the custom processor?  I wrote the sample Java program which
> does the same thing as the custom processor (authenticate every time the
> file is created/create a file, create 2 threads and run concurrently), it’s
> working fine.  The custom processor also fine when this is not running
> concurrently.
>
> *Custom service control – set the properties for the Microsoft Azure
> Datalake Store*
> *Custom processor – authenticate, then create a file in Microsoft Azure
> Datalake Store*
>
> Error1:
> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
> n.a.d.processors.PutFileAzureDLStore
> java.util.ConcurrentModificationException: null
> at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
> at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
> ~[na:1.8.0_77]
> at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
> ~[na:1.8.0_77]
> at
> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
> at retrofit2.RequestFactory.create(RequestFactory.java:50)
> ~[na:na]
> at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181)
> ~[na:na]
> at retrofit2.OkHttpCall.execute(OkHttpCall.java:165)
> ~[na:na]
> at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
> at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
> at
> nifi.azure.dl

Re: Custom processor is failing for concurrency

2016-06-03 Thread Oleg Zhurakousky
Kumiko

It appears that the current state of the source you linked in is not in sync 
with what is in the stack trace. Perhaps you have made some code modifications 
(e.g., line 218 is an empty line in code while it has a pointer in the star 
trace).
In any event, from what I can see the error is coming from Azure libraries (not 
NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’ seems to be 
doing some iteration where I presume the remove is called. Perhaps it is not a 
thread safe class after all. What does Microsoft documentation says? Have you 
looked at the source to see what’s going on there? If its open please link and 
we can tale a look.

Cheers
Oleg

On Jun 3, 2016, at 4:58 PM, Kumiko Yada 
mailto:kumiko.y...@ds-iq.com>> wrote:

Here is the code, https://github.com/kyada1/PutFileAzureDLStore.

Thanks
Kumiko

From: Bryan Bende [mailto:bbe...@gmail.com]
Sent: Friday, June 3, 2016 12:57 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for the custom processor

Hello,

Would you be able to share your code for PutFileAzureDLStore so we can help 
identify if there is a concurrency problem?

-Bryan

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada 
mailto:kumiko.y...@ds-iq.com>> wrote:
Hello,

I wrote the following custom service control and processor.  When the custom 
processor is running concurrently, it’s failing often with several different 
errors.  Are there any special handlings for concurrently that I need to add in 
the custom processor?  I wrote the sample Java program which does the same 
thing as the custom processor (authenticate every time the file is 
created/create a file, create 2 threads and run concurrently), it’s working 
fine.  The custom processor also fine when this is not running concurrently.

Custom service control – set the properties for the Microsoft Azure Datalake 
Store
Custom processor – authenticate, then create a file in Microsoft Azure Datalake 
Store

Error1:
2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] 
c.m.aad.adal4j.AuthenticationAuthority [Correlation ID: 
64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] 
n.a.d.processors.PutFileAzureDLStore
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461) 
~[na:1.8.0_77]
at 
com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
at retrofit2.RequestFactory.create(RequestFactory.java:50) 
~[na:na]
at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) 
~[na:na]
at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
 ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_77]
at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

Error2

RE: Custom processor is failing for concurrency

2016-06-03 Thread Kumiko Yada
Here is the code, https://github.com/kyada1/PutFileAzureDLStore.

Thanks
Kumiko

From: Bryan Bende [mailto:bbe...@gmail.com]
Sent: Friday, June 3, 2016 12:57 PM
To: users@nifi.apache.org
Subject: Re: Custom processor is failing for the custom processor

Hello,

Would you be able to share your code for PutFileAzureDLStore so we can help 
identify if there is a concurrency problem?

-Bryan

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada 
mailto:kumiko.y...@ds-iq.com>> wrote:
Hello,

I wrote the following custom service control and processor.  When the custom 
processor is running concurrently, it’s failing often with several different 
errors.  Are there any special handlings for concurrently that I need to add in 
the custom processor?  I wrote the sample Java program which does the same 
thing as the custom processor (authenticate every time the file is 
created/create a file, create 2 threads and run concurrently), it’s working 
fine.  The custom processor also fine when this is not running concurrently.

Custom service control – set the properties for the Microsoft Azure Datalake 
Store
Custom processor – authenticate, then create a file in Microsoft Azure Datalake 
Store

Error1:
2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] 
c.m.aad.adal4j.AuthenticationAuthority [Correlation ID: 
64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] 
n.a.d.processors.PutFileAzureDLStore
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461) 
~[na:1.8.0_77]
at 
com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
at retrofit2.RequestFactory.create(RequestFactory.java:50) 
~[na:na]
at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) 
~[na:na]
at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
 ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_77]
at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

Error2:
2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to uncaught 
Exception: com.microsoft.rest.ServiceException: Invalid status code 403
2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5] 
o.a.n.c.t.ContinuallyRunProcessorTask
com.microsoft.rest.ServiceException: Invalid status code 403
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[na:1.8.0_77]
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[na:1.8.0_77]
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:4

Re: Custom processor is failing for the custom processor

2016-06-03 Thread Bryan Bende
Hello,

Would you be able to share your code for PutFileAzureDLStore so we can help
identify if there is a concurrency problem?

-Bryan

On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada  wrote:

> Hello,
>
>
>
> I wrote the following custom service control and processor.  When the
> custom processor is running concurrently, it’s failing often with several
> different errors.  Are there any special handlings for concurrently that I
> need to add in the custom processor?  I wrote the sample Java program which
> does the same thing as the custom processor (authenticate every time the
> file is created/create a file, create 2 threads and run concurrently), it’s
> working fine.  The custom processor also fine when this is not running
> concurrently.
>
>
>
> *Custom service control – set the properties for the Microsoft Azure
> Datalake Store*
>
> *Custom processor – authenticate, then create a file in Microsoft Azure
> Datalake Store*
>
>
>
> Error1:
>
> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
>
> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
> n.a.d.processors.PutFileAzureDLStore
>
> java.util.ConcurrentModificationException: null
>
> at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
>
> at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
> ~[na:1.8.0_77]
>
> at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
> ~[na:1.8.0_77]
>
> at
> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
>
> at retrofit2.RequestFactory.create(RequestFactory.java:50)
> ~[na:na]
>
> at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181)
> ~[na:na]
>
> at retrofit2.OkHttpCall.execute(OkHttpCall.java:165)
> ~[na:na]
>
> at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
>
> at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
>
> at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
> ~[na:na]
>
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
>
> at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
>
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
>
>
>
> Error2:
>
> 2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to
> uncaught Exception: com.microsoft.rest.ServiceException: Invalid status
> code 403
>
> 2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask
>
> com.microsoft.rest.ServiceException: Invalid status code 403
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> ~[na:1.8.0_77]
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> ~[na:1.8.0_77]
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[na:1.8.0_77]
>
> at
> java.lang.reflect

Custom processor is failing for the custom processor

2016-06-03 Thread Kumiko Yada
Hello,

I wrote the following custom service control and processor.  When the custom 
processor is running concurrently, it's failing often with several different 
errors.  Are there any special handlings for concurrently that I need to add in 
the custom processor?  I wrote the sample Java program which does the same 
thing as the custom processor (authenticate every time the file is 
created/create a file, create 2 threads and run concurrently), it's working 
fine.  The custom processor also fine when this is not running concurrently.

Custom service control - set the properties for the Microsoft Azure Datalake 
Store
Custom processor - authenticate, then create a file in Microsoft Azure Datalake 
Store

Error1:
2016-06-03 12:29:31,942 INFO [pool-2815-thread-1] 
c.m.aad.adal4j.AuthenticationAuthority [Correlation ID: 
64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10] 
n.a.d.processors.PutFileAzureDLStore
java.util.ConcurrentModificationException: null
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1463) 
~[na:1.8.0_77]
at java.util.HashMap$EntryIterator.next(HashMap.java:1461) 
~[na:1.8.0_77]
at 
com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
at retrofit2.RequestFactory.create(RequestFactory.java:50) 
~[na:na]
at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181) 
~[na:na]
at retrofit2.OkHttpCall.execute(OkHttpCall.java:165) ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
 ~[na:na]
at 
nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
 ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_77]
at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_77]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_77]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

Error2:
2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to uncaught 
Exception: com.microsoft.rest.ServiceException: Invalid status code 403
2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5] 
o.a.n.c.t.ContinuallyRunProcessorTask
com.microsoft.rest.ServiceException: Invalid status code 403
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[na:1.8.0_77]
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[na:1.8.0_77]
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[na:1.8.0_77]
at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_77]
at 
com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
 ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
 ~[na:na]
at 
com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.cre

Re: Piping commands in ExecuteStreamCommand/ExecuteProcess

2016-06-03 Thread Huagen peng
Bryan,

That was a brilliant suggestion.  I got it to work.  Many thanks.

Huagen

> 在 2016年6月3日,下午12:51,Bryan Rosander  写道:
> 
> Hi Huagen,
>  
> It looks to me like the ExecuteStreamCommand is designed to run a single 
> command.  A potential work around could be to make a shell script that 
> executes the pipeline you’d like and then use the ExecuteStreamCommand to 
> invoke that.
>  
> I’d use your shell command (e.g. /bin/bash) for the command and the script 
> name for the first argument.  
> (http://stackoverflow.com/questions/18240944/cannot-launch-shell-script-with-arguments-using-java-processbuilder#answer-18241604)
>  
> 
>  
> If any of this info is bad, hopefully someone else will chime in.  I’m still 
> new around here J
>  
> Thanks,
> Bryan Rosander
>  
> From: Huagen peng 
> Reply-To: "users@nifi.apache.org" 
> Date: Friday, June 3, 2016 at 12:38 PM
> To: "users@nifi.apache.org" 
> Subject: Piping commands in ExecuteStreamCommand/ExecuteProcess
>  
> Hi, 
>  
> I was trying to get the ExecuteStreamCommand processor to execute one command 
> and then pipe the result right to another Linux command in the same 
> processor.  This is how I try to configure the processor:
>  
> Property  Value
> Command Arguments -r;${log_dir};|;bzip2;-;>;/tmp/test.bz2
> Command Path   zcat
>  
> The error I got is this:
> execution.command
> zcat
> execution.command.args
> -r;/data/similarweb/staging/170_16-05-22_01/log;|;bzip2;-;>;/tmp/test.bz2
> execution.error
> zcat: can't stat: | (|.gz): No such file or directory zcat: can't stat: bzip2 
> (bzip2.gz): No such file or directory gzcat: unknown compression format 
> gzcat: can't stat: > (>.gz): No such file or directory gzcat: can't stat: 
> /tmp/test.bz2 (/tmp/test.bz2.gz): No such file or directory
> previous
> Empty string set
>  
>  
> I understand that I can execute the bzip2 command in a second 
> ExecuteStreamCommand processor or simply use the CompressContent processor, 
> and I event got them to work.  The issue I was trying to avoid is to have a 
> lot of data in the data flow clogging up the memory.  What I wanted to 
> achieve in my naive approach is to leverage the series of Linux commands do 
> that job for me without bringing the huge amount of data into the multiple 
> steps of flow.
>  
> Can piping commands in the processor be done in NiFi?
>  
> Thanks,
>  
> Huagen
>  
>  
>  



Re: Piping commands in ExecuteStreamCommand/ExecuteProcess

2016-06-03 Thread Bryan Rosander
Hi Huagen,

It looks to me like the ExecuteStreamCommand is designed to run a single 
command.  A potential work around could be to make a shell script that executes 
the pipeline you’d like and then use the ExecuteStreamCommand to invoke that.

I’d use your shell command (e.g. /bin/bash) for the command and the script name 
for the first argument.  
(http://stackoverflow.com/questions/18240944/cannot-launch-shell-script-with-arguments-using-java-processbuilder#answer-18241604)

If any of this info is bad, hopefully someone else will chime in.  I’m still 
new around here ☺

Thanks,
Bryan Rosander

From: Huagen peng 
Reply-To: "users@nifi.apache.org" 
Date: Friday, June 3, 2016 at 12:38 PM
To: "users@nifi.apache.org" 
Subject: Piping commands in ExecuteStreamCommand/ExecuteProcess

Hi,

I was trying to get the ExecuteStreamCommand processor to execute one command 
and then pipe the result right to another Linux command in the same processor.  
This is how I try to configure the processor:

Property  Value
Command Arguments -r;${log_dir};|;bzip2;-;>;/tmp/test.bz2
Command Path   zcat

The error I got is this:
execution.command
zcat
execution.command.args
-r;/data/similarweb/staging/170_16-05-22_01/log;|;bzip2;-;>;/tmp/test.bz2
execution.error
zcat: can't stat: | (|.gz): No such file or directory zcat: can't stat: bzip2 
(bzip2.gz): No such file or directory gzcat: unknown compression format gzcat: 
can't stat: > (>.gz): No such file or directory gzcat: can't stat: 
/tmp/test.bz2 (/tmp/test.bz2.gz): No such file or directory
previous
Empty string set


I understand that I can execute the bzip2 command in a second 
ExecuteStreamCommand processor or simply use the CompressContent processor, and 
I event got them to work.  The issue I was trying to avoid is to have a lot of 
data in the data flow clogging up the memory.  What I wanted to achieve in my 
naive approach is to leverage the series of Linux commands do that job for me 
without bringing the huge amount of data into the multiple steps of flow.

Can piping commands in the processor be done in NiFi?

Thanks,

Huagen





Piping commands in ExecuteStreamCommand/ExecuteProcess

2016-06-03 Thread Huagen peng
Hi,

I was trying to get the ExecuteStreamCommand processor to execute one command 
and then pipe the result right to another Linux command in the same processor.  
This is how I try to configure the processor:

Property  Value
Command Arguments -r;${log_dir};|;bzip2;-;>;/tmp/test.bz2
Command Path   zcat

The error I got is this:
execution.commandzcat
execution.command.args-r;/data/similarweb/staging/170_16-05-22_01/log;|;bzip2;-;>;/tmp/test.bz2
execution.errorzcat: can't stat: | (|.gz): No such file or directory zcat: 
can't stat: bzip2 (bzip2.gz): No such file or directory gzcat: unknown 
compression format gzcat: can't stat: > (>.gz): No such file or directory 
gzcat: can't stat: /tmp/test.bz2 (/tmp/test.bz2.gz): No such file or directory
previousEmpty string set


I understand that I can execute the bzip2 command in a second 
ExecuteStreamCommand processor or simply use the CompressContent processor, and 
I event got them to work.  The issue I was trying to avoid is to have a lot of 
data in the data flow clogging up the memory.  What I wanted to achieve in my 
naive approach is to leverage the series of Linux commands do that job for me 
without bringing the huge amount of data into the multiple steps of flow.

Can piping commands in the processor be done in NiFi?

Thanks,

Huagen





Re: How does GetSFTP treat files being loaded?

2016-06-03 Thread Oleg Zhurakousky
Huagen

Just to clarify. There isn’t really an SFTP server. SFTP is just an FTP layer 
over SSH - allowing you to get FTP-like experience when accessing file system 
over SSH.
Now, tis your question.
I am assuming you are implying that some other system is uploading a large 
file. In any event a typical and well established pattern for upload/download 
is to use temporary name until upload/download has finished and then rename. 
This way the consuming system doesn’t see “work-in-progress” until it is 
finished. In other words  the consuming system will never see that file until 
its fully uploaded. 

Hope this helps.
Cheers
Oleg

> On Jun 3, 2016, at 10:59 AM, Huagen peng  wrote:
> 
> Hi,
> 
> I need to get files from a SFTP server and then remove the files afterward.  
> GetSFTP seems to be the processor to use.  If a user uploads a large file, 
> say 20G, to the server and the GetSFTP processor happens to be running in the 
> middle of the uploading, what is the expected behavior.  Does the processor 
> pick up the file? If so, can the processor get the entire file and then 
> safely delete it? Does anybody has experience on that?
> 
> Huagen



Re: How does GetSFTP treat files being loaded?

2016-06-03 Thread Bryan Bende
Adding to what Oleg said...

GetSFTP has a property called Ignore Dotted Files which defaults to true
and tells GetSFTP to ignore filenames that begin with dots which can help
in this scenario if the uploader can upload to temporary file starting with
a dot and rename when done.

-Bryan


On Fri, Jun 3, 2016 at 11:10 AM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> Huagen
>
> Just to clarify. There isn’t really an SFTP server. SFTP is just an FTP
> layer over SSH - allowing you to get FTP-like experience when accessing
> file system over SSH.
> Now, tis your question.
> I am assuming you are implying that some other system is uploading a large
> file. In any event a typical and well established pattern for
> upload/download is to use temporary name until upload/download has finished
> and then rename. This way the consuming system doesn’t see
> “work-in-progress” until it is finished. In other words  the consuming
> system will never see that file until its fully uploaded.
>
> Hope this helps.
> Cheers
> Oleg
>
> > On Jun 3, 2016, at 10:59 AM, Huagen peng  wrote:
> >
> > Hi,
> >
> > I need to get files from a SFTP server and then remove the files
> afterward.  GetSFTP seems to be the processor to use.  If a user uploads a
> large file, say 20G, to the server and the GetSFTP processor happens to be
> running in the middle of the uploading, what is the expected behavior.
> Does the processor pick up the file? If so, can the processor get the
> entire file and then safely delete it? Does anybody has experience on that?
> >
> > Huagen
>
>


How does GetSFTP treat files being loaded?

2016-06-03 Thread Huagen peng
Hi,

I need to get files from a SFTP server and then remove the files afterward.  
GetSFTP seems to be the processor to use.  If a user uploads a large file, say 
20G, to the server and the GetSFTP processor happens to be running in the 
middle of the uploading, what is the expected behavior.  Does the processor 
pick up the file? If so, can the processor get the entire file and then safely 
delete it? Does anybody has experience on that?

Huagen

Re: Merge multiple flowfiles

2016-06-03 Thread Oleg Zhurakousky
Huagen,
I also want to apologize for my spell-checker butchering your name ;)

Cheers
Oleg

On Jun 3, 2016, at 8:03 AM, Oleg Zhurakousky 
mailto:ozhurakou...@hortonworks.com>> wrote:

Huge

Just to close the loop on this one, I also wanted to point out this JIRA 
https://issues.apache.org/jira/browse/NIFI-1926 for general purpose aggregation 
processor which indeed would support multiple connections, configurable 
aggregation, release and correlation strategies.
It would be nice if you can describe your use case in that JIRA, so we can 
start gathering these use cases.

Cheers
Oleg

On Jun 3, 2016, at 2:33 AM, Huagen peng 
mailto:huagen.p...@gmail.com>> wrote:

Thanks for the reply, Andy.

I ended up abandoning my previous approach and using ExecuteStreamCommand to 
output (with zcat command on GZ files) all the files I want to concatenate.  
Then performing some data manipulation and saving the file.

Huagen

在 2016年6月3日,上午12:29,Andy LoPresto 
mailto:alopre...@apache.org>> 写道:

Huagen,

Sorry, I am a little confused. My understanding is that you want to combine n 
individual logs (each with a respective flowfile) from a specific hour into a 
single file. What is confusing is when you say “Even with that [a 5* 
confirmation loop], I occasionally still get more than one merged flowfile.” Do 
you mean that what you expected to be combined into a single flowfile is output 
as two distinct and incomplete flowfiles?

Without seeing a template of your work flow, I can make a couple of suggestions.

First, as mentioned last night by James Wing, I would encourage you to look at 
the MergeContent [1] processor properties to provide a high threshold for 
merging flowfiles. If you know the number of log files per hour a priori, you 
can set that as the “Minimum Number of Entries” and ensure that output will 
wait until that many flowfiles have been accumulated.

Also, given that you have described a “loop”, I would imagine you may have 
multiple connections feeding into MergeContent. MergeContent can have 
unexpected behavior with multiple incoming connections, and so I would 
recommend adding a Funnel to aggregate all incoming connections and provide a 
single incoming connection to MergeContent.

Please let us know if this helps, and if not, please share a template and some 
sample input if possible. Thanks.

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/index.html


Andy LoPresto
alopre...@apache.org
alopresto.apa...@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 1, 2016, at 11:52 AM, Huagen peng 
mailto:huagen.p...@gmail.com>> wrote:

Hi,

In the data flow I am dealing with now, there are multiple (up to 200) logs 
associated with a given hour.  I need to process these fragment hourly logs and 
then concatenate them into a single file.  The approach I am using now has an 
UpdateAttribute processor to set an arbitrary segment.original.filename 
attribute on all the flowfiles I want to merge.  Then I use a MergeContent 
processor, with an UpdateAttribute and RouteOnAttribute processor to form a 
loop to confirm five times that the merge is complete.  Even with that, I 
occasionally still get more than one merged flowfile.

Is there a better way to do this?  Or should I increase the loop count, say 10?

Thanks.

Huagen






Re: Merge multiple flowfiles

2016-06-03 Thread Oleg Zhurakousky
Huge

Just to close the loop on this one, I also wanted to point out this JIRA 
https://issues.apache.org/jira/browse/NIFI-1926 for general purpose aggregation 
processor which indeed would support multiple connections, configurable 
aggregation, release and correlation strategies.
It would be nice if you can describe your use case in that JIRA, so we can 
start gathering these use cases.

Cheers
Oleg

On Jun 3, 2016, at 2:33 AM, Huagen peng 
mailto:huagen.p...@gmail.com>> wrote:

Thanks for the reply, Andy.

I ended up abandoning my previous approach and using ExecuteStreamCommand to 
output (with zcat command on GZ files) all the files I want to concatenate.  
Then performing some data manipulation and saving the file.

Huagen

在 2016年6月3日,上午12:29,Andy LoPresto 
mailto:alopre...@apache.org>> 写道:

Huagen,

Sorry, I am a little confused. My understanding is that you want to combine n 
individual logs (each with a respective flowfile) from a specific hour into a 
single file. What is confusing is when you say “Even with that [a 5* 
confirmation loop], I occasionally still get more than one merged flowfile.” Do 
you mean that what you expected to be combined into a single flowfile is output 
as two distinct and incomplete flowfiles?

Without seeing a template of your work flow, I can make a couple of suggestions.

First, as mentioned last night by James Wing, I would encourage you to look at 
the MergeContent [1] processor properties to provide a high threshold for 
merging flowfiles. If you know the number of log files per hour a priori, you 
can set that as the “Minimum Number of Entries” and ensure that output will 
wait until that many flowfiles have been accumulated.

Also, given that you have described a “loop”, I would imagine you may have 
multiple connections feeding into MergeContent. MergeContent can have 
unexpected behavior with multiple incoming connections, and so I would 
recommend adding a Funnel to aggregate all incoming connections and provide a 
single incoming connection to MergeContent.

Please let us know if this helps, and if not, please share a template and some 
sample input if possible. Thanks.

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/index.html


Andy LoPresto
alopre...@apache.org
alopresto.apa...@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 1, 2016, at 11:52 AM, Huagen peng 
mailto:huagen.p...@gmail.com>> wrote:

Hi,

In the data flow I am dealing with now, there are multiple (up to 200) logs 
associated with a given hour.  I need to process these fragment hourly logs and 
then concatenate them into a single file.  The approach I am using now has an 
UpdateAttribute processor to set an arbitrary segment.original.filename 
attribute on all the flowfiles I want to merge.  Then I use a MergeContent 
processor, with an UpdateAttribute and RouteOnAttribute processor to form a 
loop to confirm five times that the merge is complete.  Even with that, I 
occasionally still get more than one merged flowfile.

Is there a better way to do this?  Or should I increase the loop count, say 10?

Thanks.

Huagen





Re: run.as option does not work other than Nifi user

2016-06-03 Thread Andre
are you running from RPM?

If yes, the permissions of the /opt/nifi/ are quite restrictive.

Cheers

On Fri, Jun 3, 2016 at 4:39 PM, Shashi Vishwakarma  wrote:

> Hi
>
> I want to run my NiFi application using ec2-user rather than default nifi
> user. I changed *run.as =ec2-user* in bootstrap.conf but
> it did not worked .It is not allowing me to start Nifi application getting
> following error while staring nifi service.
>
>
>
>1. ./nifi.sh start
>2. nifi.sh: JAVA_HOME not set; results may vary
>3.
>4.
>5. Java home:
>6. NiFi home: /opt/nifi/current
>7.
>8.
>9. Bootstrap Config File: /opt/nifi/current/conf/bootstrap.conf
>10.
>11. Error: Could not find or load main class 
> org.apache.nifi.bootstrap.RunNiFi
>
> Any pointer to this issue?
>
> Thanks
>
> Shashi
>