Hi,

OutputAndTimeBoundedSplittableProcessElementInvoker, the 
KafkaUnboundedReader.start() is called for every checkpoint and DirectRunner 
produces checkpoint every 100 records or every 1 

This is not what I am seeing. The following few extracted log messages are 
generated in the KafkaUnboundedReader.start() method

[INFO ] 2021-05-25 20:55:34.528 [direct-runner-worker] 
beam.sdk.io.kafka.KafkaUnboundedReader - Reader-0: reading from 
psp-reporting-file-notifications-0 starting at offset 0
[INFO ] 2021-05-25 20:55:34.638 [direct-runner-worker] 
beam.sdk.io.kafka.KafkaUnboundedReader - Reader-0: reading from 
psp-reporting-file-notifications-0 starting at offset 0
[INFO ] 2021-05-25 20:55:34.689 [direct-runner-worker] 
beam.sdk.io.kafka.KafkaUnboundedReader - Reader-0: reading from 
psp-reporting-file-notifications-0 starting at offset 0

If you look at the timestamps you see that .start() is being called fairly 
intensely, the above lines are just a few of the tons I pulled from the log 
output.

Regards,
Serge

On 25 May 2021 at 19:10:21, Boyuan Zhang (boyu...@google.com) wrote:

If you are not using `withMaxNumRecords` nor `withMaxReadTime` , Beam 2.28.0 
and 2.29.0 will wrap KafkaIO.read() with UnboundedSourceAsSDFWrapperFn and 
executes it as Splittable DoFn via 
OutputAndTimeBoundedSplittableProcessElementInvoker. The KafkaConsumer is 
created every time KafkaUnboundedReader.start() is called. With 
OutputAndTimeBoundedSplittableProcessElementInvoker, the 
KafkaUnboundedReader.start() is called for every checkpoint and DirectRunner 
produces checkpoint every 100 records or every 1 seconds. So it's expected that 
there are many consumers created, but they should not be existing at the same 
time because we should close one consumer first before creating another one.

On Tue, May 25, 2021 at 9:05 AM Alexey Romanenko <aromanenko....@gmail.com> 
wrote:

On 24 May 2021, at 10:43, Sozonoff Serge <se...@sozonoff.com> wrote:

OK thanks.  Just to clarify, in my case the message throughput is zero when I 
start the Beam pipeline up and it will still crash once all file handles are 
consumed even if I dont send a single message to the kafka topic.

This sounds like a bug for me even if it happens only with DirectRunner.  Mind 
you to provide a code of a pipeline and running command that reproduces this 
issue?

—
Alexey


Thanks,
Serge

On 24 May 2021 at 10:14:33, Jan Lukavský (je...@seznam.cz) wrote:

It is not 100 consumers, the checkpoint is created every 100 records. So, if 
your message throughput is high enough, the consumers might be created really 
often. But most importantly - DirectRunner is really not intended for 
performance sensitive applications. You should use a different runner for that.

Best,

 Jan

On 5/24/21 10:03 AM, Sozonoff Serge wrote:
Hi Jan,

So if I read your SO answer correctly and looking at the Github link you 
provided we are talking about ~100 consumers ? Since I am developing locally 
with a dockerized minimal Kafka broker it is possible that this is enough to 
hit the max open files limit. 

Depending on your definition of “limited” I would say there are more than a 
limited number present at the same time. If you look at the below log extract 
everyone of those “Kafka version: 2.5.0” lines corresponds to a Kafka consumer 
instantiation and that’s within a very short period of time !! 

Thanks,
Serge



[INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.668 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.725 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.730 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.776 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.782 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.863 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.876 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.935 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.940 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.976 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:49.979 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.026 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.038 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.107 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.130 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.165 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.169 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.201 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.205 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.261 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.276 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.339 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.343 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.375 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.378 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.409 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.417 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.498 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.509 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.559 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.562 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.589 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.591 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.624 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.628 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.693 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.704 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.775 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.778 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.806 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.808 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.862 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.870 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.940 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.950 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.988 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:50.990 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.018 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.020 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.046 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.048 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.077 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.083 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.156 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.167 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.226 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.232 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0





On 24 May 2021 at 09:35:50, Jan Lukavský (je...@seznam.cz) wrote:

Hi Serge,

I posted answer to the SO question, hope that helps. One question - a frequent 
creation of consumers should be expected with DirectRunner, but there should be 
only a limited number of them at a time. Do you see many of them present 
simultaneously? Or are they correctly closed and released?

 Jan

On 5/23/21 8:40 AM, Sozonoff Serge wrote:
Hi,

I would like to refer to the following Stackoverflow issue I found.

https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer

I have the very same issue when developing my Pipeline. Originally the pipeline 
was bound and would read and process a CSV where the name came from a 
parameter. Following some reading up on various patterns for being able to 
process a new incoming file automatically I added a KafkaIO read to the front 
of my pipeline to listen for messages which contain the name of a file to be 
processed and then I pass this on to FileIO etc …. As such my pipeline is now 
unbound.

My pipeline fails using DirectRunner once we have reached the maximum number of 
open files!! Looking at the logging I see a very large number of threads 
(consumers) which seem to be connecting to the Kafka broker which makes no 
sense. I have a topic with a single partition!


So literally 100’s of these. Notice the pool thread numbers and Consumer client 
id's

[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75, 
groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking to 
LATEST offset of partition my_topic-0
…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127, 
groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking to 
LATEST offset of partition my_topic-0
….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41, 
groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking to LATEST 
offset of partition my_topic-0
….
etc ….


So my issue resembles the one which is described in the Stackoverflow and I can 
confirm that switching to a Flink runner resolves the problem but surely there 
is an explanation ? Is there a know bug with Direct Runner ?

Kind thanks,
Serge

Reply via email to