hi All,

I'm facing the problem reported on
https://issues.apache.org/jira/browse/BEAM-6516

running a flex template based on kafka-to-bigquery and adapted to rabbitmq

java.lang.RuntimeException: Exception while finalizing checkpoint

   1.
      1. at org.apache.beam.runners.dataflow.worker.
      StreamingModeExecutionContext.lambda$flushState$0 (
      StreamingModeExecutionContext.java:403
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingModeExecutionContext.java&line=403&project=rabbitmq-integration-301417>
      )
      2. at org.apache.beam.runners.dataflow.worker.
      StreamingDataflowWorker.lambda$callFinalizeCallbacks$3 (
      StreamingDataflowWorker.java:1218
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1218&project=rabbitmq-integration-301417>
      )
      3. at java.util.concurrent.ThreadPoolExecutor.runWorker (
      ThreadPoolExecutor.java:1149
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=rabbitmq-integration-301417>
      )
      4. at java.util.concurrent.ThreadPoolExecutor$Worker.run (
      ThreadPoolExecutor.java:624
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=624&project=rabbitmq-integration-301417>
      )
      5. at java.lang.Thread.run (Thread.java:748
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Flang%2FThread.java&line=748&project=rabbitmq-integration-301417>
      )

Caused by: java.io.IOException

   1.
      1. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:129
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=129&project=rabbitmq-integration-301417>
      )
      2. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:125
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=125&project=rabbitmq-integration-301417>
      )
      3. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
      AMQChannel.java:147
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=147&project=rabbitmq-integration-301417>
      )
      4. at com.rabbitmq.client.impl.ChannelN.txCommit (ChannelN.java:1540
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=1540&project=rabbitmq-integration-301417>
      )
      5. at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit
       (AutorecoveringChannel.java:663
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl.recovery%2FAutorecoveringChannel.java&line=663&project=rabbitmq-integration-301417>
      )
   2.

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
method-id=80)

   1.
      1. at com.rabbitmq.utility.ValueOrException.getValue (
      ValueOrException.java:66
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FValueOrException.java&line=66&project=rabbitmq-integration-301417>
      )
      2. at com.rabbitmq.utility.
      BlockingValueOrException.uninterruptibleGetValue (
      BlockingValueOrException.java:36
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FBlockingValueOrException.java&line=36&project=rabbitmq-integration-301417>
      )
      3. at com.rabbitmq.client.impl.
      AMQChannel$BlockingRpcContinuation.getReply (AMQChannel.java:502
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=502&project=rabbitmq-integration-301417>
      )
      4. at com.rabbitmq.client.impl.AMQChannel.privateRpc (
      AMQChannel.java:293
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=293&project=rabbitmq-integration-301417>
      )
      5. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
      AMQChannel.java:141
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=141&project=rabbitmq-integration-301417>
      )

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
method-id=80)

   1.
      1. at com.rabbitmq.client.impl.ChannelN.asyncShutdown (
      ChannelN.java:522
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=522&project=rabbitmq-integration-301417>
      )
      2. at com.rabbitmq.client.impl.ChannelN.processAsync (
      ChannelN.java:346
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=346&project=rabbitmq-integration-301417>
      )
      3. at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand
       (AMQChannel.java:182
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=182&project=rabbitmq-integration-301417>
      )
      4. at com.rabbitmq.client.impl.AMQChannel.handleFrame (
      AMQChannel.java:114
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=114&project=rabbitmq-integration-301417>
      )
      5. at com.rabbitmq.client.impl.AMQConnection.readFrame (
      AMQConnection.java:672
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=672&project=rabbitmq-integration-301417>
      )
      6. at com.rabbitmq.client.impl.AMQConnection.access$300 (
      AMQConnection.java:48
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=48&project=rabbitmq-integration-301417>
      )
      7. at com.rabbitmq.client.impl.AMQConnection$MainLoop.run (
      AMQConnection.java:599
      
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=599&project=rabbitmq-integration-301417>
      )


When I'm receiving the messages from a Rabbitmq queue on Dataflow

I already tried to add "use_deprecated_read" on the parameters list

But I'm still seeing many error messages

and sometimes the message is duplicated on BQ

my rabbitmq queue configuration
Details
Type topic
Features
durable: true
Policy








Does anyone have any plans to fix this issue or any workaround to work?

-- 
Rafael Fernando Ribeiro

Reply via email to