As a workaround, you can add --experiments=use_deprecated_read when launching your pipeline to bypass the sdf unbounded source wrapper here.
On Mon, Nov 23, 2020 at 2:52 PM Boyuan Zhang <[email protected]> wrote: > Hi Rafael, > > As you mentioned, within withMaxNumRecords, the unbounded source will be > executed as the bounded one. It may not be ideal for you. > > It seems like a bug for direct runner and sdf unbounded source wrapper > when doing finalizeCheckpoint. Do you want to file a JIRA on this problem? > > On Mon, Nov 23, 2020 at 2:28 PM Rafael Ribeiro <[email protected]> > wrote: > >> >> Hi, >>> >>> I'm implementing a pipeline to read RabbitMq queue. >>> >>> I'm having problems when I read it at unbound stream >>> it is saying that channel is already closed and ack is not sent to >>> rabbitmq and message still on the queue: >>> ---------------------------------------------------------------- >>> WARNING: Failed to finalize >>> Finalization{expiryTime=2020-11-21T19:33:14.909Z, >>> callback=org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$$Lambda$378/0x00000001007ee440@4ae82af9} >>> for completed bundle CommittedImmutableListBundle{PCollection=Read RabbitMQ >>> queue/Read(RabbitMQSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.out >>> [PCollection], >>> key=org.apache.beam.repackaged.direct_java.runners.local.StructuralKey$CoderStructuralKey@3607f949, >>> elements=[ValueInGlobalWindow{value=ComposedKeyedWorkItem{key=[-55, 41, >>> -123, 97, 13, 104, 92, 61, 92, 122, -19, 112, -90, 16, 7, -97, 89, 107, >>> -80, 12, 9, 120, 10, -97, 72, 114, -62, -105, 101, -34, 96, 48, 30, -96, 8, >>> -19, 23, -115, -9, 87, 1, -58, -127, 70, -59, -24, -40, -111, -63, -119, >>> 51, -108, 126, 64, -4, -120, -41, 9, 56, -63, -18, -18, -1, 17, -82, 90, >>> -32, 110, 67, -12, -97, 10, -107, -110, 13, -74, -47, -113, 122, 27, 52, >>> 46, -111, -118, -8, 118, -3, 20, 71, -109, 65, -87, -94, 107, 114, 116, >>> -110, -126, -79, -123, -67, 18, -33, 70, -100, 9, -81, -65, -2, 98, 33, >>> -122, -46, 23, -103, -70, 79, -23, 74, 9, 5, -9, 65, -33, -52, 5, 9, 101], >>> elements=[], timers=[TimerData{timerId=1:1605986594072, timerFamilyId=, >>> namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@4958d651), >>> timestamp=2020-11-21T19:23:14.072Z, >>> outputTimestamp=2020-11-21T19:23:14.072Z, domain=PROCESSING_TIME}]}, >>> pane=PaneInfo.NO_FIRING}], minimumTimestamp=-290308-12-21T19:59:05.225Z, >>> synchronizedProcessingOutputWatermark=2020-11-21T19:23:14.757Z} >>> com.rabbitmq.client.AlreadyClosedException: channel is already closed >>> due to clean channel shutdown; protocol method: >>> #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, >>> method-id=0) >>> at >>> com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258) >>> at >>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427) >>> at >>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421) >>> at >>> com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93) >>> at >>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428) >>> at >>> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:433) >>> at >>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:195) >>> at >>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:287) >>> at >>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:189) >>> at >>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:126) >>> at >>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >>> at >>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>> at >>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>> at java.base/java.lang.Thread.run(Thread.java:834) >>> ---------------------------------------------------------------- >>> BUT >>> if I include withMaxNumRecords >>> I receive the message and ack is sent to rabbitmq queue >>> but it works as bound data >>> >>> ---------------------------------------------------------------- >>> CODE >>> my code is like below: >>> Pipeline p = Pipeline.create(options); >>> >>> PCollection<RabbitMqMessage> messages = p.apply("Read RabbitMQ queue", >>> RabbitMqIO.read() >>> .withUri("amqp://guest:guest@localhost:5672") >>> .withQueue("cart.idle.process") >>> //.withMaxNumRecords(1) // TRANFORM BOUND >>> ); >>> >>> PCollection<TableRow> rows = messages.apply("Transform Json to >>> TableRow", >>> ParDo.of(new DoFn<RabbitMqMessage, TableRow>() { >>> >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> >>> ObjectMapper objectMapper = new ObjectMapper(); >>> String jsonInString = new String(c.element().getBody()); >>> LOG.info(jsonInString); >>> } >>> })); >>> >>> rows.apply( >>> "Write to BigQuery", >>> BigQueryIO.writeTableRows() >>> .to("livelo-analytics-dev:cart_idle.cart_idle_process") >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) >>> >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> ); >>> >>> Someone could help on this? >>> -- >>> Rafael Fernando Ribeiro >>> >>
