hi Jean-Baptiste,

I'm sharing my pipeline with you:
and I'm running it on Dataflow

public static PipelineResult run(Options options) {

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);

// Register the coder for pipeline
FailsafeElementCoder<KV<String, String>, String> coder =
FailsafeElementCoder.of(
KvCoder.of(
NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of
())),
NullableCoder.of(StringUtf8Coder.of()));

CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

/*
* Steps:
* 1) Read messages in from Kafka
* 2) Transform the messages into TableRows
* - Transform message payload via UDF
* - Convert UDF result to TableRow objects
* 3) Write successful records out to BigQuery
* 4) Write failed records out to BigQuery
*/

PCollection<RabbitMqMessage> messages = pipeline
.apply("Read Rabbit queue",
RabbitMqIO.read()
.withUri("amqp://" + options.getRabbitUser() + ":" + options.
getRabbitPassword() + "@" + options.getRabbitHost() + ":" + options.
getRabbitPort())
.withQueue(options.getRabbitQueue())
);

final TupleTag<String> dummyTag = new TupleTag<String>(){};

PCollection<KV<String,String>> rowsProcessed = messages.apply("Transform
RabbitMqMessage to Json",
ParDo.of(new DoFn<RabbitMqMessage, KV<String,String>>() {

@ProcessElement
public void processElement(ProcessContext c) {
String jsonInString = new String(c.element().getBody());
LOG.info(jsonInString);
c.output(KV.of("test", jsonInString));
}
}));

PCollectionTuple convertedTableRows =
rowsProcessed.apply("ConvertMessageToTableRow", new MessageToTableRow
(options));

/*
* Step #3: Write the successful records out to BigQuery
*/
WriteResult writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));

/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to
FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(RabbitMqToBigQuery::wrapBigQueryInsertError))
.setCoder(FAILSAFE_ELEMENT_CODER);

/*
* Step #4: Write failed records out to BigQuery
*/
PCollectionList.of(convertedTableRows.get(UDF_DEADLETTER_OUT))
.and(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteTransformationFailedRecords",
WriteKafkaMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());

/*
* Step #5: Insert records that failed BigQuery inserts into a deadletter
table.
*/
failedInserts.apply(
"WriteInsertionFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());

return pipeline.run();
}

Em sáb., 16 de jan. de 2021 às 02:02, Jean-Baptiste Onofre <j...@nanthrax.net>
escreveu:

> Hi,
>
> Let me take a look.
>
> Regards
> JB
>
> Le 15 janv. 2021 à 20:43, Rafael Ribeiro <rfribe...@gmail.com> a écrit :
>
> hi All,
>
> I'm facing the problem reported on
> https://issues.apache.org/jira/browse/BEAM-6516
>
> running a flex template based on kafka-to-bigquery and adapted to rabbitmq
>
> java.lang.RuntimeException: Exception while finalizing checkpoint
>
>    1.
>       1. at org.apache.beam.runners.dataflow.worker.
>       StreamingModeExecutionContext.lambda$flushState$0 (
>       StreamingModeExecutionContext.java:403
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingModeExecutionContext.java&line=403&project=rabbitmq-integration-301417>
>       )
>       2. at org.apache.beam.runners.dataflow.worker.
>       StreamingDataflowWorker.lambda$callFinalizeCallbacks$3 (
>       StreamingDataflowWorker.java:1218
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1218&project=rabbitmq-integration-301417>
>       )
>       3. at java.util.concurrent.ThreadPoolExecutor.runWorker (
>       ThreadPoolExecutor.java:1149
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=rabbitmq-integration-301417>
>       )
>       4. at java.util.concurrent.ThreadPoolExecutor$Worker.run (
>       ThreadPoolExecutor.java:624
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=624&project=rabbitmq-integration-301417>
>       )
>       5. at java.lang.Thread.run (Thread.java:748
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Flang%2FThread.java&line=748&project=rabbitmq-integration-301417>
>       )
>
> Caused by: java.io.IOException
>
>    1.
>       1. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:129
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=129&project=rabbitmq-integration-301417>
>       )
>       2. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:125
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=125&project=rabbitmq-integration-301417>
>       )
>       3. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
>       AMQChannel.java:147
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=147&project=rabbitmq-integration-301417>
>       )
>       4. at com.rabbitmq.client.impl.ChannelN.txCommit (ChannelN.java:1540
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=1540&project=rabbitmq-integration-301417>
>       )
>       5. at com.rabbitmq.client.impl.recovery.
>       AutorecoveringChannel.txCommit (AutorecoveringChannel.java:663
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl.recovery%2FAutorecoveringChannel.java&line=663&project=rabbitmq-integration-301417>
>       )
>    2.
>
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
> method-id=80)
>
>    1.
>       1. at com.rabbitmq.utility.ValueOrException.getValue (
>       ValueOrException.java:66
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FValueOrException.java&line=66&project=rabbitmq-integration-301417>
>       )
>       2. at com.rabbitmq.utility.
>       BlockingValueOrException.uninterruptibleGetValue (
>       BlockingValueOrException.java:36
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FBlockingValueOrException.java&line=36&project=rabbitmq-integration-301417>
>       )
>       3. at com.rabbitmq.client.impl.
>       AMQChannel$BlockingRpcContinuation.getReply (AMQChannel.java:502
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=502&project=rabbitmq-integration-301417>
>       )
>       4. at com.rabbitmq.client.impl.AMQChannel.privateRpc (
>       AMQChannel.java:293
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=293&project=rabbitmq-integration-301417>
>       )
>       5. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc (
>       AMQChannel.java:141
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=141&project=rabbitmq-integration-301417>
>       )
>
> Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
> protocol method: #method<channel.close>(reply-code=406,
> reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60,
> method-id=80)
>
>    1.
>       1. at com.rabbitmq.client.impl.ChannelN.asyncShutdown (
>       ChannelN.java:522
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=522&project=rabbitmq-integration-301417>
>       )
>       2. at com.rabbitmq.client.impl.ChannelN.processAsync (
>       ChannelN.java:346
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=346&project=rabbitmq-integration-301417>
>       )
>       3. at com.rabbitmq.client.impl.
>       AMQChannel.handleCompleteInboundCommand (AMQChannel.java:182
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=182&project=rabbitmq-integration-301417>
>       )
>       4. at com.rabbitmq.client.impl.AMQChannel.handleFrame (
>       AMQChannel.java:114
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=114&project=rabbitmq-integration-301417>
>       )
>       5. at com.rabbitmq.client.impl.AMQConnection.readFrame (
>       AMQConnection.java:672
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=672&project=rabbitmq-integration-301417>
>       )
>       6. at com.rabbitmq.client.impl.AMQConnection.access$300 (
>       AMQConnection.java:48
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=48&project=rabbitmq-integration-301417>
>       )
>       7. at com.rabbitmq.client.impl.AMQConnection$MainLoop.run (
>       AMQConnection.java:599
>       
> <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=599&project=rabbitmq-integration-301417>
>       )
>
>
> When I'm receiving the messages from a Rabbitmq queue on Dataflow
>
> I already tried to add "use_deprecated_read" on the parameters list
>
> But I'm still seeing many error messages
>
> and sometimes the message is duplicated on BQ
>
> my rabbitmq queue configuration
> Details
> Type topic
> Features
> durable: true
> Policy
>
>
>
>
>
>
>
>
> Does anyone have any plans to fix this issue or any workaround to work?
>
> --
> Rafael Fernando Ribeiro
>
>
>

-- 
Rafael Fernando Ribeiro

Reply via email to