hi Guys, No one can help with that? Any clue or example pipeline?
regards Rafael Ribeiro ---------- Forwarded message --------- De: Rafael Ribeiro <[email protected]> Date: seg., 18 de jan. de 2021 às 14:31 Subject: Re: BEAM-6516 - RabbitMqIO.Read - Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80) To: Jean-Baptiste Onofre <[email protected]> Cc: dev <[email protected]>, <[email protected]> hi Jean-Baptiste, I'm sharing my pipeline with you: and I'm running it on Dataflow public static PipelineResult run(Options options) { // Create the pipeline Pipeline pipeline = Pipeline.create(options); // Register the coder for pipeline FailsafeElementCoder<KV<String, String>, String> coder = FailsafeElementCoder.of( KvCoder.of( NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of ())), NullableCoder.of(StringUtf8Coder.of())); CoderRegistry coderRegistry = pipeline.getCoderRegistry(); coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder); /* * Steps: * 1) Read messages in from Kafka * 2) Transform the messages into TableRows * - Transform message payload via UDF * - Convert UDF result to TableRow objects * 3) Write successful records out to BigQuery * 4) Write failed records out to BigQuery */ PCollection<RabbitMqMessage> messages = pipeline .apply("Read Rabbit queue", RabbitMqIO.read() .withUri("amqp://" + options.getRabbitUser() + ":" + options. getRabbitPassword() + "@" + options.getRabbitHost() + ":" + options. getRabbitPort()) .withQueue(options.getRabbitQueue()) ); final TupleTag<String> dummyTag = new TupleTag<String>(){}; PCollection<KV<String,String>> rowsProcessed = messages.apply("Transform RabbitMqMessage to Json", ParDo.of(new DoFn<RabbitMqMessage, KV<String,String>>() { @ProcessElement public void processElement(ProcessContext c) { String jsonInString = new String(c.element().getBody()); LOG.info(jsonInString); c.output(KV.of("test", jsonInString)); } })); PCollectionTuple convertedTableRows = rowsProcessed.apply("ConvertMessageToTableRow", new MessageToTableRow (options)); /* * Step #3: Write the successful records out to BigQuery */ WriteResult writeResult = convertedTableRows .get(TRANSFORM_OUT) .apply( "WriteSuccessfulRecords", BigQueryIO.writeTableRows() .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .to(options.getOutputTableSpec())); /* * Step 3 Contd. * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement */ PCollection<FailsafeElement<String, String>> failedInserts = writeResult .getFailedInsertsWithErr() .apply( "WrapInsertionErrors", MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()) .via(RabbitMqToBigQuery::wrapBigQueryInsertError)) .setCoder(FAILSAFE_ELEMENT_CODER); /* * Step #4: Write failed records out to BigQuery */ PCollectionList.of(convertedTableRows.get(UDF_DEADLETTER_OUT)) .and(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT)) .apply("Flatten", Flatten.pCollections()) .apply( "WriteTransformationFailedRecords", WriteKafkaMessageErrors.newBuilder() .setErrorRecordsTable( ObjectUtils.firstNonNull( options.getOutputDeadletterTable(), options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)) .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA) .build()); /* * Step #5: Insert records that failed BigQuery inserts into a deadletter table. */ failedInserts.apply( "WriteInsertionFailedRecords", ErrorConverters.WriteStringMessageErrors.newBuilder() .setErrorRecordsTable( ObjectUtils.firstNonNull( options.getOutputDeadletterTable(), options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX)) .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA) .build()); return pipeline.run(); } Em sáb., 16 de jan. de 2021 às 02:02, Jean-Baptiste Onofre <[email protected]> escreveu: > Hi, > > Let me take a look. > > Regards > JB > > Le 15 janv. 2021 à 20:43, Rafael Ribeiro <[email protected]> a écrit : > > hi All, > > I'm facing the problem reported on > https://issues.apache.org/jira/browse/BEAM-6516 > > running a flex template based on kafka-to-bigquery and adapted to rabbitmq > > java.lang.RuntimeException: Exception while finalizing checkpoint > > 1. > 1. at org.apache.beam.runners.dataflow.worker. > StreamingModeExecutionContext.lambda$flushState$0 ( > StreamingModeExecutionContext.java:403 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingModeExecutionContext.java&line=403&project=rabbitmq-integration-301417> > ) > 2. at org.apache.beam.runners.dataflow.worker. > StreamingDataflowWorker.lambda$callFinalizeCallbacks$3 ( > StreamingDataflowWorker.java:1218 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1218&project=rabbitmq-integration-301417> > ) > 3. at java.util.concurrent.ThreadPoolExecutor.runWorker ( > ThreadPoolExecutor.java:1149 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=rabbitmq-integration-301417> > ) > 4. at java.util.concurrent.ThreadPoolExecutor$Worker.run ( > ThreadPoolExecutor.java:624 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=624&project=rabbitmq-integration-301417> > ) > 5. at java.lang.Thread.run (Thread.java:748 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=java%2Flang%2FThread.java&line=748&project=rabbitmq-integration-301417> > ) > > Caused by: java.io.IOException > > 1. > 1. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:129 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=129&project=rabbitmq-integration-301417> > ) > 2. at com.rabbitmq.client.impl.AMQChannel.wrap (AMQChannel.java:125 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=125&project=rabbitmq-integration-301417> > ) > 3. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc ( > AMQChannel.java:147 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=147&project=rabbitmq-integration-301417> > ) > 4. at com.rabbitmq.client.impl.ChannelN.txCommit (ChannelN.java:1540 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=1540&project=rabbitmq-integration-301417> > ) > 5. at com.rabbitmq.client.impl.recovery. > AutorecoveringChannel.txCommit (AutorecoveringChannel.java:663 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl.recovery%2FAutorecoveringChannel.java&line=663&project=rabbitmq-integration-301417> > ) > 2. > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, > method-id=80) > > 1. > 1. at com.rabbitmq.utility.ValueOrException.getValue ( > ValueOrException.java:66 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FValueOrException.java&line=66&project=rabbitmq-integration-301417> > ) > 2. at com.rabbitmq.utility. > BlockingValueOrException.uninterruptibleGetValue ( > BlockingValueOrException.java:36 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.utility%2FBlockingValueOrException.java&line=36&project=rabbitmq-integration-301417> > ) > 3. at com.rabbitmq.client.impl. > AMQChannel$BlockingRpcContinuation.getReply (AMQChannel.java:502 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=502&project=rabbitmq-integration-301417> > ) > 4. at com.rabbitmq.client.impl.AMQChannel.privateRpc ( > AMQChannel.java:293 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=293&project=rabbitmq-integration-301417> > ) > 5. at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc ( > AMQChannel.java:141 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=141&project=rabbitmq-integration-301417> > ) > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, > method-id=80) > > 1. > 1. at com.rabbitmq.client.impl.ChannelN.asyncShutdown ( > ChannelN.java:522 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=522&project=rabbitmq-integration-301417> > ) > 2. at com.rabbitmq.client.impl.ChannelN.processAsync ( > ChannelN.java:346 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FChannelN.java&line=346&project=rabbitmq-integration-301417> > ) > 3. at com.rabbitmq.client.impl. > AMQChannel.handleCompleteInboundCommand (AMQChannel.java:182 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=182&project=rabbitmq-integration-301417> > ) > 4. at com.rabbitmq.client.impl.AMQChannel.handleFrame ( > AMQChannel.java:114 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQChannel.java&line=114&project=rabbitmq-integration-301417> > ) > 5. at com.rabbitmq.client.impl.AMQConnection.readFrame ( > AMQConnection.java:672 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=672&project=rabbitmq-integration-301417> > ) > 6. at com.rabbitmq.client.impl.AMQConnection.access$300 ( > AMQConnection.java:48 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=48&project=rabbitmq-integration-301417> > ) > 7. at com.rabbitmq.client.impl.AMQConnection$MainLoop.run ( > AMQConnection.java:599 > > <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-01-14_12_12_16-8803828081371959712&file=com%2Frabbitmq.client.impl%2FAMQConnection.java&line=599&project=rabbitmq-integration-301417> > ) > > > When I'm receiving the messages from a Rabbitmq queue on Dataflow > > I already tried to add "use_deprecated_read" on the parameters list > > But I'm still seeing many error messages > > and sometimes the message is duplicated on BQ > > my rabbitmq queue configuration > Details > Type topic > Features > durable: true > Policy > > > > > > > > > Does anyone have any plans to fix this issue or any workaround to work? > > -- > Rafael Fernando Ribeiro > > > -- Rafael Fernando Ribeiro -- Rafael Fernando Ribeiro
