[ https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573139#comment-16573139 ]
Alexey Romanenko commented on BEAM-4803: ---------------------------------------- [~vivek_17] Yes, as you properly noticed, runner should cache readers and it uses {{com.google.common.cache.Cache}} internally for that purpose and this cache is created with {{expireAfterAccess(long duration, TimeUnit unit)}}, where {{duration = readerCacheInterval}}. Seems that it's not enough for microbatches and cache entry expires very quickly. Perhaps, we need to rethink this cache policy for streaming mode. > Beam spark runner not working properly with kafka > ------------------------------------------------- > > Key: BEAM-4803 > URL: https://issues.apache.org/jira/browse/BEAM-4803 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark > Affects Versions: 2.4.0, 2.5.0 > Reporter: Vivek Agarwal > Assignee: Jean-Baptiste Onofré > Priority: Major > > We are running a beam stream processing job on a spark runner, which reads > from a kafka topic using kerberos authentication. We are using java-io-kafka > v2.4.0 to read from kafka topic in the pipeline. The issue is that the > kafkaIO client is continuously creating a new kafka consumer with specified > config, doing kerberos login every time. Also, there are spark streaming jobs > which get spawned for the unbounded source, every second or so even when > there is no data in the kafka topic. Log has these jobs- > INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172 > We can see in the logs > INFO MicrobatchSource: No cached reader found for split: > [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new > reader at checkpoint mark... > And then it creates new consumer doing fresh kerberos login, which is > creating issues. > We are unsure of what should be correct behavior here and why so many spark > streaming jobs are getting created. We tried the beam code with flink runner > and did not find this issue there. Can someone point to the correct settings > for using unbounded kafka source with spark runner using beam? -- This message was sent by Atlassian JIRA (v7.6.3#76005)