[ 
https://issues.apache.org/jira/browse/NIFI-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025879#comment-17025879
 ] 

Branko Peshevski commented on NIFI-7072:
----------------------------------------

The BinaryEncoder instances returned by 
EncoderFactory.get().blockingBinaryEncoder(OutputStream out, BinaryEncoder 
reuse) are not thread-safe.

 

https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)

> ForkRecord in Extract mode fails if concurrency is increased.
> -------------------------------------------------------------
>
>                 Key: NIFI-7072
>                 URL: https://issues.apache.org/jira/browse/NIFI-7072
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.10.0, 1.9.2, 1.11.0
>         Environment: Java 8, Macosx,
>            Reporter: Branko Peshevski
>            Priority: Major
>         Attachments: ForkRecord_concurrency_bug.xml
>
>
> I have followed the example for ForkRecord from [additional details 
> page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html]
>  for multi-nested arrays and discovered that if the concurrency is increased 
> the processor and the content repository fails.
> {code:java}
> 2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5] 
> o.a.n.c.r.StandardProcessSession Failed to write content to 
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0];
>  rolling back session java.io.IOException: Stream is closed at 
> org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855)
>  at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
> org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101)
>  at 
> org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49)
>  at 
> org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49)
>  at 
> org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636)
>  at 
> org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
>  at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
>  at 
> org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
>  at 
> org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
>  at 
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
>  at 
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
>  at 
> org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
>  at 
> org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) 
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
>  at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
>  at 
> org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) 
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>  at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>  at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>  at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR 
> [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord 
> ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork 
> StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1580234187165-76713, 
> container=default, section=937], offset=487914, 
> length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]: 
> org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write 
> to Content Repository for 
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
>  org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write 
> to Content Repository for 
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
>  at 
> org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641)
>  at 
> org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
>  at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
>  at 
> org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
>  at 
> org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
>  at 
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
>  at 
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
>  at 
> org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
>  at 
> org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) 
> at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
>  at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
>  at 
> org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) 
> at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>  at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>  at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>  at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: 
> Stream is closed
> {code}
> I have provided a template in the attachment that reproduces the bug. The 
> json data from the example was modified so it has more transactions in one of 
> the accounts so the bug can be hit easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to