Hi Shen, Can you share which Beam version are you using? Just check master code, the default version for Kafka is `<kafka.clients.version>0.11.0.1</kafka.clients.version>`. I cannot recall the usage for old versions, my application(2.2.0-SNAPSHOT) works with a customized kafka version based on 0.10.00-SASL. What you need to do is 1). exclude the kafka-client in KafkaIO, and add your own Kafka client library in pom.xml; 2). add your configuration like: ``` Map<String, Object> consumerPara = new HashMap<String, Object>(); //consumerPara.put(ConsumerConfig.GROUP_ID_CONFIG, consumerName); //consumerPara.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); if (secureEnabled) { consumerPara.put("sasl.mechanism", "IAF"); consumerPara.put("security.protocol", "SASL_PLAINTEXT"); consumerPara.put("sasl.login.class", "....."); consumerPara.put("sasl.callback.handler.class", "..."); }
KafkaIO.<byte[], byte[]>read() .... .updateConsumerProperties(configUpdates) ....; ``` Mingmin On Mon, Oct 30, 2017 at 10:21 AM, Raghu Angadi <rang...@google.com.invalid> wrote: > > https://issues.apache.org/jira/browse/BEAM-307 > > This should be closed. > > On Mon, Oct 30, 2017 at 9:00 AM, Lukasz Cwik <lc...@google.com.invalid> > wrote: > > > There has been some discussion about getting Kafka 0.10.x working on > > BEAM-307[1]. > > > > As an immediate way to unblock yourself, modify your local copy of the > > KafkaIO source to include setting the system property in a static block > > before the class is loaded or before the Kafka client is instantiated and > > used. > > > > Also consider contributing to the Kafka connector to getting 0.10.x > > working. > > > > 1: https://issues.apache.org/jira/browse/BEAM-307 > > > > On Mon, Oct 30, 2017 at 8:14 AM, Shen Li <cs.she...@gmail.com> wrote: > > > > > Hi, > > > > > > To use KafkaIO in secure mode, I need to set > > > -Djava.security.auth.login.config to point to a JAAS configuration > file. > > > It > > > works fine for local execution. But how can I configure the > > > "java.security.auth.login.config" property in the Beam app when the > > > pipeline is submitted to a cluster/cloud-service? Even if I use a ParDo > > to > > > set the system property, there is no guarantee that the ParDo will run > on > > > the same server with the KafkaIO source. > > > > > > For this specific problem, it would be helpful to upgrade to Kafka > Client > > > 0.10.2.0, which provides a "sasl.jaas.config" property that can be > > updated > > > programmatically. Or is there any other work around? > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 85%3A+Dynamic+JAAS+ > > > configuration+for+Kafka+clients > > > > > > Thanks, > > > Shen > > > > > > -- ---- Mingmin