Hi, Let me take a look.
Regards JB > Le 15 janv. 2021 à 20:43, Rafael Ribeiro <rfribe...@gmail.com> a écrit : > > hi All, > > I'm facing the problem reported on > https://issues.apache.org/jira/browse/BEAM-6516 > <https://issues.apache.org/jira/browse/BEAM-6516> > > running a flex template based on kafka-to-bigquery and adapted to rabbitmq > > java.lang.RuntimeException: Exception while finalizing checkpoint > at > org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.lambda$flushState$0 > (StreamingModeExecutionContext.java:403 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingModeExecutionContext.java&line=403&project=rabbitmq-integration-301417>) > at > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$callFinalizeCallbacks$3 > (StreamingDataflowWorker.java:1218 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1218&project=rabbitmq-integration-301417>) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1149 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=rabbitmq-integration-301417>) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:624 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=624&project=rabbitmq-integration-301417>) > at java.lang.Thread.run (Thread.java:748 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Flang%2FThread.java&line=748&project=rabbitmq-integration-301417>) > Caused by: java.io.IOException > at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:129 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=129&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:125 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=125&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:147 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=147&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.ChannelN.txCommit (ChannelN.java:1540 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=1540&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit > (AutorecoveringChannel.java:663 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl.recovery%2FAutorecoveringChannel.java&line=663&project=rabbitmq-integration-301417>) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, > method-id=80) > at com.rabbitmq.utility.ValueOrException.getValue (ValueOrException.java:66 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FValueOrException.java&line=66&project=rabbitmq-integration-301417>) > at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue > (BlockingValueOrException.java:36 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FBlockingValueOrException.java&line=36&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply > (AMQChannel.java:502 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=502&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.privateRpc (AMQChannel.java:293 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=293&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (AMQChannel.java:141 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=141&project=rabbitmq-integration-301417>) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, > method-id=80) > at com.rabbitmq.client.impl.ChannelN.asyncShutdown (ChannelN.java:522 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=522&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.ChannelN.processAsync (ChannelN.java:346 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=346&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand > (AMQChannel.java:182 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=182&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQChannel.handleFrame (AMQChannel.java:114 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=114&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQConnection.readFrame (AMQConnection.java:672 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=672&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQConnection.access$300 (AMQConnection.java:48 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=48&project=rabbitmq-integration-301417>) > at com.rabbitmq.client.impl.AMQConnection$MainLoop.run > (AMQConnection.java:599 > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=599&project=rabbitmq-integration-301417>) > > When I'm receiving the messages from a Rabbitmq queue on Dataflow > > I already tried to add "use_deprecated_read" on the parameters list > > But I'm still seeing many error messages > > and sometimes the message is duplicated on BQ > > my rabbitmq queue configuration > Details > > > Type topic > Features > durable: true > Policy > > > > > > > > > Does anyone have any plans to fix this issue or any workaround to work? > > -- > Rafael Fernando Ribeiro