Hello,

We have a 3 node Nifi 1.24.0 cluster running on K8s, secured with TLS. Everything operates as expected, except on certain load balanced queues, where a lot of errors pop up regarding unexpected data-frame indicators and broken pipes. Below you can find a sample of these errors. These errors only pop-up so far in the queues between 3 specific processors, ConvertRecord->MergeContent->PutMongoRecord. These errors cause a significant slowdown of the processing and dissappear together with the slwodown when compression is disabled on these queues.

Initially this caused the JVM heap to become full becuase CommunicateAction objects were not freed from the heap, but after backporting the patch in https://issues.apache.org/jira/browse/NIFI-12532 these object get freed. Because the errors kept persisting I figured this could be caused  by the mixing of various InputStreams with GZIPInputStream and their mismatched .available() methods in StandardLoadBalanceProtocol.java, RecordReaders.java and CompressableRecordReader.java. Alas even after I wrapped the streams in a custom stream that would behave as GZIPInputStream's .available() method expected, the issue remained. So far I haven't identified any egregious memory leaks due to this issue (other than the one I patched above), but the significant slowdown of the processing remains and there seem to be more than the usual leftovers in the content repository. What could be causing this, granted that the overwhelming majority of queues dont display this behaviour?

2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut 2024-06-13 13:00:43,132 ERROR [Load-Balance Server Thread-241] o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate over Channel /10.233.121.228:6342::/10.233.97.209:51300 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut java.io.IOException: Expected a Data Frame Indicator from Peer nifi-cluster-3-node.nifi-cluster-headless.nifikop.svc.cluster.local but received a value of 3 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.consumeContent(StandardLoadBalanceProtocol.java:575) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFile(StandardLoadBalanceProtocol.java:520) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:259) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:139) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:166) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$AcceptConnection.lambda$run$0(ConnectionLoadBalanceServer.java:275) 2024-06-13 13:00:43,133 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.lang.Thread.run(Unknown Source)

2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut java.io.IOException: Expected a Data Frame Indicator from Peer nifi-cluster-2-node.nifi-cluster-headless.nifikop.svc.cluster.local but received a value of 0 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.consumeContent(StandardLoadBalanceProtocol.java:575) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFile(StandardLoadBalanceProtocol.java:520) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:259) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:139) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:166) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$AcceptConnection.lambda$run$0(ConnectionLoadBalanceServer.java:275) 2024-06-13 13:00:45,341 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.lang.Thread.run(Unknown Source)

2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut 2024-06-13 13:00:45,400 ERROR [Load-Balanced Client Thread-7] o.a.n.c.q.c.c.a.n.NioAsyncLoadBalanceClient Failed to communicate with Peer nifi-cluster-2-node.nifi-cluster-headless.nifikop.svc.cluster.local:8443 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut java.io.IOException: Broken pipe 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.SocketDispatcher.write(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.IOUtil.write(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.IOUtil.write(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/sun.nio.ch.SocketChannelImpl.write(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.client.async.nio.PeerChannel.write(PeerChannel.java:190) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.client.async.nio.LoadBalanceSession.communicate(LoadBalanceSession.java:158) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClient.communicate(NioAsyncLoadBalanceClient.java:265) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask.run(NioAsyncLoadBalanceClientTask.java:81) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.util.concurrent.FutureTask.run(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 2024-06-13 13:00:45,401 INFO [NiFi logging handler] org.apache.nifi.StdOut     at java.base/java.lang.Thread.run(Unknown Source)

Thank you,

Marios Tsolekas

Reply via email to