hi All, I'm facing the problem reported on https://issues.apache.org/jira/browse/BEAM-6516
running a flex template based on kafka-to-bigquery and adapted to rabbitmq
java.lang.RuntimeException: Exception while finalizing checkpoint
1.
1. at org.apache.beam.runners.dataflow.worker.
StreamingModeExecutionContext.lambda$flushState$0 (
StreamingModeExecutionContext.java:403
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingModeExecutionContext.java&line=403&project=rabbitmq-integration-301417>
)
2. at org.apache.beam.runners.dataflow.worker.
StreamingDataflowWorker.lambda$callFinalizeCallbacks$3 (
StreamingDataflowWorker.java:1218
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1218&project=rabbitmq-integration-301417>
)
3. at java.util.concurrent.ThreadPoolExecutor.runWorker (
ThreadPoolExecutor.java:1149
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=rabbitmq-integration-301417>
)
4. at java.util.concurrent.ThreadPoolExecutor$Worker.run (
ThreadPoolExecutor.java:624
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=624&project=rabbitmq-integration-301417>
)
5. at java.lang.Thread.run (Thread.java:748
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Flang%2FThread.java&line=748&project=rabbitmq-integration-301417>
)
Caused by: java.io.IOException
1.
1. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:129
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=129&project=rabbitmq-integration-301417>
)
2. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:125
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=125&project=rabbitmq-integration-301417>
)
3. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
AMQChannel.java:147
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=147&project=rabbitmq-integration-301417>
)
4. at com.rabbitmq.client.impl.ChannelN.txCommit (ChannelN.java:1540
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=1540&project=rabbitmq-integration-301417>
)
5. at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit
(AutorecoveringChannel.java:663
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl.recovery%2FAutorecoveringChannel.java&line=663&project=rabbitmq-integration-301417>
)
2.
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
method-id=80)
1.
1. at com.rabbitmq.utility.ValueOrException.getValue (
ValueOrException.java:66
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FValueOrException.java&line=66&project=rabbitmq-integration-301417>
)
2. at com.rabbitmq.utility.
BlockingValueOrException.uninterruptibleGetValue (
BlockingValueOrException.java:36
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FBlockingValueOrException.java&line=36&project=rabbitmq-integration-301417>
)
3. at com.rabbitmq.client.impl.
AMQChannel$BlockingRpcContinuation.getReply (AMQChannel.java:502
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=502&project=rabbitmq-integration-301417>
)
4. at com.rabbitmq.client.impl.AMQChannel.privateRpc (
AMQChannel.java:293
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=293&project=rabbitmq-integration-301417>
)
5. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
AMQChannel.java:141
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=141&project=rabbitmq-integration-301417>
)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
protocol method: #method<channel.close>(reply-code=406,
reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
method-id=80)
1.
1. at com.rabbitmq.client.impl.ChannelN.asyncShutdown (
ChannelN.java:522
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=522&project=rabbitmq-integration-301417>
)
2. at com.rabbitmq.client.impl.ChannelN.processAsync (
ChannelN.java:346
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=346&project=rabbitmq-integration-301417>
)
3. at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand
(AMQChannel.java:182
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=182&project=rabbitmq-integration-301417>
)
4. at com.rabbitmq.client.impl.AMQChannel.handleFrame (
AMQChannel.java:114
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=114&project=rabbitmq-integration-301417>
)
5. at com.rabbitmq.client.impl.AMQConnection.readFrame (
AMQConnection.java:672
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=672&project=rabbitmq-integration-301417>
)
6. at com.rabbitmq.client.impl.AMQConnection.access$300 (
AMQConnection.java:48
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=48&project=rabbitmq-integration-301417>
)
7. at com.rabbitmq.client.impl.AMQConnection$MainLoop.run (
AMQConnection.java:599
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=599&project=rabbitmq-integration-301417>
)
When I'm receiving the messages from a Rabbitmq queue on Dataflow
I already tried to add "use_deprecated_read" on the parameters list
But I'm still seeing many error messages
and sometimes the message is duplicated on BQ
my rabbitmq queue configuration
Details
Type topic
Features
durable: true
Policy
Anyone have any plans to fix this issue or any workaround to work?
--
Rafael Fernando Ribeiro
