Thanks for updating the stackoverflow thread.

I just filed the jira here: https://issues.apache.org/jira/browse/BEAM-11328
for tracking progress.

On Mon, Nov 23, 2020 at 4:10 PM Rafael Ribeiro <rfribe...@gmail.com> wrote:

> @Boyuan Zhang you make my day happy.
>
> The workaround worked as a charm
>
> Could we raise an issue on JIRA to fix that?
> I also put the answer on stackoverflow for someone else that could have
> the same problem
>
> https://stackoverflow.com/questions/64947718/apache-beam-rabbitmq-read-fail-ack-message-and-exception-raised/64978419#64978419
>
> thanks a lot for your help
>
>
> Em seg., 23 de nov. de 2020 às 19:55, Boyuan Zhang <boyu...@google.com>
> escreveu:
>
>> As a workaround, you can add --experiments=use_deprecated_read when
>> launching your pipeline to bypass the sdf unbounded source wrapper here.
>>
>> On Mon, Nov 23, 2020 at 2:52 PM Boyuan Zhang <boyu...@google.com> wrote:
>>
>>> Hi Rafael,
>>>
>>> As you mentioned, within withMaxNumRecords, the unbounded source will be
>>> executed as the bounded one. It may not be ideal for you.
>>>
>>> It seems like a bug for direct runner and sdf unbounded source wrapper
>>> when doing finalizeCheckpoint. Do you want to file a JIRA on this problem?
>>>
>>> On Mon, Nov 23, 2020 at 2:28 PM Rafael Ribeiro <rfribe...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>>
>>>>> I'm implementing a pipeline to read RabbitMq queue.
>>>>>
>>>>> I'm having problems when I read it at unbound stream
>>>>> it is saying that channel is already closed and ack is not sent to
>>>>> rabbitmq and message still on the queue:
>>>>> ----------------------------------------------------------------
>>>>> WARNING: Failed to finalize
>>>>> Finalization{expiryTime=2020-11-21T19:33:14.909Z,
>>>>> callback=org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$$Lambda$378/0x00000001007ee440@4ae82af9}
>>>>> for completed bundle CommittedImmutableListBundle{PCollection=Read 
>>>>> RabbitMQ
>>>>> queue/Read(RabbitMQSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.out
>>>>> [PCollection],
>>>>> key=org.apache.beam.repackaged.direct_java.runners.local.StructuralKey$CoderStructuralKey@3607f949,
>>>>> elements=[ValueInGlobalWindow{value=ComposedKeyedWorkItem{key=[-55, 41,
>>>>> -123, 97, 13, 104, 92, 61, 92, 122, -19, 112, -90, 16, 7, -97, 89, 107,
>>>>> -80, 12, 9, 120, 10, -97, 72, 114, -62, -105, 101, -34, 96, 48, 30, -96, 
>>>>> 8,
>>>>> -19, 23, -115, -9, 87, 1, -58, -127, 70, -59, -24, -40, -111, -63, -119,
>>>>> 51, -108, 126, 64, -4, -120, -41, 9, 56, -63, -18, -18, -1, 17, -82, 90,
>>>>> -32, 110, 67, -12, -97, 10, -107, -110, 13, -74, -47, -113, 122, 27, 52,
>>>>> 46, -111, -118, -8, 118, -3, 20, 71, -109, 65, -87, -94, 107, 114, 116,
>>>>> -110, -126, -79, -123, -67, 18, -33, 70, -100, 9, -81, -65, -2, 98, 33,
>>>>> -122, -46, 23, -103, -70, 79, -23, 74, 9, 5, -9, 65, -33, -52, 5, 9, 101],
>>>>> elements=[], timers=[TimerData{timerId=1:1605986594072, timerFamilyId=,
>>>>> namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@4958d651),
>>>>> timestamp=2020-11-21T19:23:14.072Z,
>>>>> outputTimestamp=2020-11-21T19:23:14.072Z, domain=PROCESSING_TIME}]},
>>>>> pane=PaneInfo.NO_FIRING}], minimumTimestamp=-290308-12-21T19:59:05.225Z,
>>>>> synchronizedProcessingOutputWatermark=2020-11-21T19:23:14.757Z}
>>>>> com.rabbitmq.client.AlreadyClosedException: channel is already closed
>>>>> due to clean channel shutdown; protocol method:
>>>>> #method<channel.close>(reply-code=200, reply-text=OK, class-id=0,
>>>>> method-id=0)
>>>>>         at
>>>>> com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
>>>>>         at
>>>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
>>>>>         at
>>>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
>>>>>         at
>>>>> com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
>>>>>         at
>>>>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428)
>>>>>         at
>>>>> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:433)
>>>>>         at
>>>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:195)
>>>>>         at
>>>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:287)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:189)
>>>>>         at
>>>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:126)
>>>>>         at
>>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>>>         at
>>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>>>         at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>>         at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>         at java.base/java.lang.Thread.run(Thread.java:834)
>>>>>   ----------------------------------------------------------------
>>>>> BUT
>>>>> if I include  withMaxNumRecords
>>>>> I receive the message and ack is sent to rabbitmq queue
>>>>> but it works as bound data
>>>>>
>>>>>   ----------------------------------------------------------------
>>>>> CODE
>>>>> my code is like below:
>>>>>     Pipeline p = Pipeline.create(options);
>>>>>
>>>>>    PCollection<RabbitMqMessage> messages = p.apply("Read RabbitMQ
>>>>> queue",
>>>>>         RabbitMqIO.read()
>>>>>         .withUri("amqp://guest:guest@localhost:5672")
>>>>>         .withQueue("cart.idle.process")
>>>>>         //.withMaxNumRecords(1)  // TRANFORM BOUND
>>>>>         );
>>>>>
>>>>>     PCollection<TableRow> rows = messages.apply("Transform Json to
>>>>> TableRow",
>>>>>         ParDo.of(new DoFn<RabbitMqMessage, TableRow>() {
>>>>>
>>>>>         @ProcessElement
>>>>>         public void processElement(ProcessContext c) {
>>>>>
>>>>>             ObjectMapper objectMapper = new ObjectMapper();
>>>>>             String jsonInString = new String(c.element().getBody());
>>>>>             LOG.info(jsonInString);
>>>>>         }
>>>>>   }));
>>>>>
>>>>>   rows.apply(
>>>>>       "Write to BigQuery",
>>>>>       BigQueryIO.writeTableRows()
>>>>>           .to("livelo-analytics-dev:cart_idle.cart_idle_process")
>>>>>
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>>>   );
>>>>>
>>>>> Someone could help on this?
>>>>> --
>>>>> Rafael Fernando Ribeiro
>>>>>
>>>>
>
> --
> Rafael Fernando Ribeiro
>

Reply via email to