[ 
https://issues.apache.org/jira/browse/BEAM-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502336#comment-16502336
 ] 

Raghu Angadi commented on BEAM-4485:
------------------------------------

Fortunately each of the two problems you mentioned have alternate ways:
 # access on local launcher machine : KafkaIO needs this in-order to fetch 
number of partitions for the topic. If you don't have access, then you can also 
provide list of partitons explicity to reader. See `withTopicPartitions()` api. 
Dataflow worker machine do need access.
  
 # Difficulty with providing key files : This was a limitation in KafkaConsumer 
configuration.  Setting ACLs in Kafka got much better in 0.10.2 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients]
 It allows setting these parameters in consumer config itself, which makes it 
simpler to use with Dataflow. KafkaIO allows setting pretty much any of the 
consumer config.

Does this help?

> Incredibly difficult to use KakfaIO + TLS + DataflowRunner
> ----------------------------------------------------------
>
>                 Key: BEAM-4485
>                 URL: https://issues.apache.org/jira/browse/BEAM-4485
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Andre
>            Assignee: Raghu Angadi
>            Priority: Minor
>             Fix For: Not applicable
>
>
> When attempting to use KafkaIO.Read with DataflowRunner, I have hit a lot of 
> walls. The brokers need to be accessible both locally and from the dataflow 
> runner instances. This means, when using TLS authentication, the 
> keystore/truststore files need to be available locally and on the instances. 
> I programmatically add the files to the pipeline options with
> {noformat}
> List<String> filesToStage = 
> PipelineResources.detectClassPathResourcesToStage(IndicatorIngest.class.getClassLoader());
> filesToStage.add("trust.p12");
> filesToStage.add("server.p12");
> {noformat}
> but even when I do this, the remote file names are different. This means that 
> I need to determine the remote file name myself, like this
> {noformat}
> PackageAttributes.forFileToStage(new File(filepath), 
> filepath).getDestination().getName();
> {noformat}
> but that function is package private, so I need to wrap this call with a 
> custom class in org.apache.beam.runners.dataflow.util. When I calculate this 
> filename, I can use it to set the ssl.<thing>.location, but this is the wrong 
> location locally, and it needs to be correct both locally and remotely. This 
> means in my main I need to calculate the local files remote names, copy them 
> to the local path with the same name, dynamically set the property to this 
> path, and programmatically add these files to be staged so they hopefully 
> have the same name on the worker. KafkaConsumer doesn't seem to provide any 
> other way to specify where to get these keys from.
> My question is, I am supposed to be jumping through all these hoops, or am I 
> doing something (or multiple things) completely wrong?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to