[ 
https://issues.apache.org/jira/browse/BEAM-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573139#comment-16573139
 ] 

Alexey Romanenko commented on BEAM-4803:
----------------------------------------

[~vivek_17] Yes, as you properly noticed, runner should cache readers and it 
uses {{com.google.common.cache.Cache}} internally for that purpose and this 
cache is created with {{expireAfterAccess(long duration, TimeUnit unit)}}, 
where {{duration = readerCacheInterval}}. 
Seems that it's not enough for microbatches and cache entry expires very 
quickly. Perhaps, we need to rethink this cache policy for streaming mode. 

> Beam spark runner not working properly with kafka
> -------------------------------------------------
>
>                 Key: BEAM-4803
>                 URL: https://issues.apache.org/jira/browse/BEAM-4803
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-spark
>    Affects Versions: 2.4.0, 2.5.0
>            Reporter: Vivek Agarwal
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>
> We are running a beam stream processing job on a spark runner, which reads 
> from a kafka topic using kerberos authentication. We are using java-io-kafka 
> v2.4.0 to read from kafka topic in the pipeline. The issue is that the 
> kafkaIO client is continuously creating a new kafka consumer with specified 
> config, doing kerberos login every time. Also, there are spark streaming jobs 
> which get spawned for the unbounded source, every second or so even when 
> there is no data in the kafka topic. Log has these jobs-
> INFO SparkContext: Starting job: dstr...@sparkunboumdedsource.java:172
> We can see in the logs
> INFO MicrobatchSource: No cached reader found for split: 
> [org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@2919a728]. Creating new 
> reader at checkpoint mark...
> And then it creates new consumer doing fresh kerberos login, which is 
> creating issues.
> We are unsure of what should be correct behavior here and why so many spark 
> streaming jobs are getting created. We tried the beam code with flink runner 
> and did not find this issue there. Can someone point to the correct settings 
> for using unbounded kafka source with spark runner using beam? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to