[ https://issues.apache.org/jira/browse/NIFI-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025879#comment-17025879 ]
Branko Peshevski commented on NIFI-7072: ---------------------------------------- The BinaryEncoder instances returned by EncoderFactory.get().blockingBinaryEncoder(OutputStream out, BinaryEncoder reuse) are not thread-safe. https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder) > ForkRecord in Extract mode fails if concurrency is increased. > ------------------------------------------------------------- > > Key: NIFI-7072 > URL: https://issues.apache.org/jira/browse/NIFI-7072 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 1.10.0, 1.9.2, 1.11.0 > Environment: Java 8, Macosx, > Reporter: Branko Peshevski > Priority: Major > Attachments: ForkRecord_concurrency_bug.xml > > > I have followed the example for ForkRecord from [additional details > page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html] > for multi-nested arrays and discovered that if the concurrency is increased > the processor and the content repository fails. > {code:java} > 2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5] > o.a.n.c.r.StandardProcessSession Failed to write content to > StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]; > rolling back session java.io.IOException: Stream is closed at > org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at > org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) > at > org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) > at > org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) > at > org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636) > at > org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at > org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) > at > org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) > at > org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) > at > org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94) > at > org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74) > at > org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91) > at > org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283) > at > org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) > at > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) > at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR > [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord > ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork > StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1580234187165-76713, > container=default, section=937], offset=487914, > length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]: > org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write > to Content Repository for > StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0] > org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write > to Content Repository for > StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0] > at > org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641) > at > org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62) > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at > org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) > at > org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) > at > org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) > at > org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94) > at > org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74) > at > org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91) > at > org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283) > at > org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) > at > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) > at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: > Stream is closed > {code} > I have provided a template in the attachment that reproduces the bug. The > json data from the example was modified so it has more transactions in one of > the accounts so the bug can be hit easier. -- This message was sent by Atlassian Jira (v8.3.4#803005)