Hi Shen,

Can you share which Beam version are you using? Just check master code, the
default version for Kafka is
`<kafka.clients.version>0.11.0.1</kafka.clients.version>`.
I cannot recall the usage for old versions, my application(2.2.0-SNAPSHOT)
works with a customized kafka version based on 0.10.00-SASL. What you need
to do is
1). exclude the kafka-client in KafkaIO, and add your own Kafka client
library in pom.xml;
2). add your configuration like:
```
        Map<String, Object> consumerPara = new HashMap<String, Object>();
        //consumerPara.put(ConsumerConfig.GROUP_ID_CONFIG, consumerName);
        //consumerPara.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
        if (secureEnabled) {
            consumerPara.put("sasl.mechanism", "IAF");
            consumerPara.put("security.protocol", "SASL_PLAINTEXT");
            consumerPara.put("sasl.login.class", ".....");
            consumerPara.put("sasl.callback.handler.class", "...");
        }

     KafkaIO.<byte[], byte[]>read()
                ....
                .updateConsumerProperties(configUpdates)
                ....;
 ```

Mingmin

On Mon, Oct 30, 2017 at 10:21 AM, Raghu Angadi <rang...@google.com.invalid>
wrote:

> >  https://issues.apache.org/jira/browse/BEAM-307
>
> This should be closed.
>
> On Mon, Oct 30, 2017 at 9:00 AM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > There has been some discussion about getting Kafka 0.10.x working on
> > BEAM-307[1].
> >
> > As an immediate way to unblock yourself, modify your local copy of the
> > KafkaIO source to include setting the system property in a static block
> > before the class is loaded or before the Kafka client is instantiated and
> > used.
> >
> > Also consider contributing to the Kafka connector to getting 0.10.x
> > working.
> >
> > 1: https://issues.apache.org/jira/browse/BEAM-307
> >
> > On Mon, Oct 30, 2017 at 8:14 AM, Shen Li <cs.she...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > To use KafkaIO in secure mode, I need to set
> > > -Djava.security.auth.login.config to point to a JAAS configuration
> file.
> > > It
> > > works fine for local execution. But how can I configure the
> > > "java.security.auth.login.config" property in the Beam app when the
> > > pipeline is submitted to a cluster/cloud-service? Even if I use a ParDo
> > to
> > > set the system property, there is no guarantee that the ParDo will run
> on
> > > the same server with the KafkaIO source.
> > >
> > > For this specific problem, it would be helpful to upgrade to Kafka
> Client
> > > 0.10.2.0, which provides a "sasl.jaas.config" property that can be
> > updated
> > > programmatically. Or is there any other work around?
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 85%3A+Dynamic+JAAS+
> > > configuration+for+Kafka+clients
> > >
> > > Thanks,
> > > Shen
> > >
> >
>



-- 
----
Mingmin

Reply via email to