It is hard to say for sure, but I think your NiFi processor is generally ok
regarding thread safety, but I think there could be a problem in the Azure
SDK code...

RequestFactory has an instance of BaseUrl and every time
RequestFactory.create() is called, it calls BaseUrl.url().

The implementation of BaseUrl is the following (according to my IntelliJ
attaching the sources...):

public class AutoRestBaseUrl implements BaseUrl {
/** A template based URL with variables wrapped in {}s. */
private String template;
/** a mapping from {} wrapped variables in the template and their actual
values. */
private Map<CharSequence, String> mappings;

@Override
public HttpUrl url() {
String url = template;
for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {
url = url.replace(entry.getKey(), entry.getValue());
}
mappings.clear();
return HttpUrl.parse(url);
}

/**
* Creates an instance of a template based URL.
*
* @param url the template based URL to use.
*/
public AutoRestBaseUrl(String url) {
this.template = url;
this.mappings = new HashMap<>();
}

/**
* Sets the value for the {} wrapped variables in the template URL.
* @param matcher the {} wrapped variable to replace.
* @param value the value to set for the variable.
*/
public void set(CharSequence matcher, String value) {
this.mappings.put(matcher, value);
}
}

The exception is coming from the line where it is looping over the entryset:

for (Map.Entry<CharSequence, String> entry : mappings.entrySet()) {

Right after that loop it calls mappings.clear() so if the RequestFactory is
shared by multiple threads (which I think it is), then one thread could be
iterating over the set, which another calls mappings.clear().


On Fri, Jun 3, 2016 at 5:09 PM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> Kumiko
>
> It appears that the current state of the source you linked in is not in
> sync with what is in the stack trace. Perhaps you have made some code
> modifications (e.g., line 218 is an empty line in code while it has a
> pointer in the star trace).
> In any event, from what I can see the error is coming from Azure libraries
> (not NiFi). Specifically ‘com.microsoft.rest.AutoRestBaseUrl.url(..)’
> seems to be doing some iteration where I presume the remove is called.
> Perhaps it is not a thread safe class after all. What does Microsoft
> documentation says? Have you looked at the source to see what’s going on
> there? If its open please link and we can tale a look.
>
> Cheers
> Oleg
>
> On Jun 3, 2016, at 4:58 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>
> Here is the code, https://github.com/kyada1/PutFileAzureDLStore.
>
> Thanks
> Kumiko
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com <bbe...@gmail.com>]
> *Sent:* Friday, June 3, 2016 12:57 PM
> *To:* users@nifi.apache.org
> *Subject:* Re: Custom processor is failing for the custom processor
>
> Hello,
>
> Would you be able to share your code for PutFileAzureDLStore so we can
> help identify if there is a concurrency problem?
>
> -Bryan
>
> On Fri, Jun 3, 2016 at 3:39 PM, Kumiko Yada <kumiko.y...@ds-iq.com> wrote:
>
> Hello,
>
> I wrote the following custom service control and processor.  When the
> custom processor is running concurrently, it’s failing often with several
> different errors.  Are there any special handlings for concurrently that I
> need to add in the custom processor?  I wrote the sample Java program which
> does the same thing as the custom processor (authenticate every time the
> file is created/create a file, create 2 threads and run concurrently), it’s
> working fine.  The custom processor also fine when this is not running
> concurrently.
>
> *Custom service control – set the properties for the Microsoft Azure
> Datalake Store*
> *Custom processor – authenticate, then create a file in Microsoft Azure
> Datalake Store*
>
> Error1:
> 2016-06-03 12:29:31,942 INFO [pool-2815-thread-1]
> c.m.aad.adal4j.AuthenticationAuthority [Correlation ID:
> 64c89876-2f7b-4fbb-8e6f-fb395a47aa87] Instance discovery was successful
> 2016-06-03 12:29:31,946 ERROR [Timer-Driven Process Thread-10]
> n.a.d.processors.PutFileAzureDLStore
> java.util.ConcurrentModificationException: null
>                 at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) ~[na:1.8.0_77]
>                 at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
> ~[na:1.8.0_77]
>                 at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
> ~[na:1.8.0_77]
>                 at
> com.microsoft.rest.AutoRestBaseUrl.url(AutoRestBaseUrl.java:28) ~[na:na]
>                 at retrofit2.RequestFactory.create(RequestFactory.java:50)
> ~[na:na]
>                 at retrofit2.OkHttpCall.createRawCall(OkHttpCall.java:181)
> ~[na:na]
>                 at retrofit2.OkHttpCall.execute(OkHttpCall.java:165)
> ~[na:na]
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
> ~[na:na]
>                 at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
>                 at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
>
> Error2:
> 2016-06-03 12:29:24,913 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> PutFileAzureDLStore[id=2133fdad-54b7-46a5-9bcf-958ddd8cfac0] due to
> uncaught Exception: com.microsoft.rest.ServiceException: Invalid status
> code 403
> 2016-06-03 12:29:24,915 WARN [Timer-Driven Process Thread-5]
> o.a.n.c.t.ContinuallyRunProcessorTask
> com.microsoft.rest.ServiceException: Invalid status code 403
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> ~[na:1.8.0_77]
>                 at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> ~[na:1.8.0_77]
>                 at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[na:1.8.0_77]
>                 at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[na:1.8.0_77]
>                 at
> com.microsoft.rest.ServiceResponseBuilder.build(ServiceResponseBuilder.java:147)
> ~[na:na]
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.createDelegate(FileSystemOperationsImpl.java:1491)
> ~[na:na]
>                 at
> com.microsoft.azure.management.datalake.store.FileSystemOperationsImpl.create(FileSystemOperationsImpl.java:1432)
> ~[na:na]
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.CreateFile(PutFileAzureDLStore.java:252)
> ~[na:na]
>                 at
> nifi.azure.dlstore.processors.PutFileAzureDLStore.onTrigger(PutFileAzureDLStore.java:218)
> ~[na:na]
>                 at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
> ~[nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
> [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>                 at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_77]
>                 at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_77]
>
> Thanks
> Kumiko
>
>
>

Reply via email to