Hi Team,

I have 3 node cluster on hdp 2.3, Kafka version : kafka 0.9.0
Previously I was able to send messages to kafka topic with older version of hdp 
i.e. 2.2, when kafka was not kerbersied.
But due to kerberisation I am not able to send message to kafka topic after 
upgradation from hdp 2.2 to 2.3 .

I am able to send messages through producer.sh through command line, but for 
that I have to do kinit manually.
Here also if I do kinit before running sample code, then also it works.
But in production I can't do kinit manually.

Is there any property I have to add in code or in jass file, which will 
automatically authorized to send messages to kafka topic.

I have followed this blog:
http://henning.kropponline.de/2015/11/15/kafka-security-with-kerberos/

Below is my sample code:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

                public static void main(String[] args) {
                                String principalName = "ctadmin";
                                String keyTabPath = 
"/etc/security/keytabs/ctadmin.keytab";
                                
HscaleSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

                                System.setProperty("java.security.krb5.conf", 
"/etc/krb5.conf");
                                
System.setProperty("java.security.auth.login.config", 
"/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");

                                
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
                                System.setProperty("sun.security.krb5.debug", 
"true");

                                String broker = "hscale-dev1-dn4:6667";

                                try {
                                                long events = 
Long.parseLong("3");
                                                Random rnd = new Random();

                                                Properties props = new 
Properties();
                                                System.out.println("After 
broker list- " + broker);

                                                
props.put("metadata.broker.list", broker);
                                                props.put("serializer.class", 
"kafka.serializer.StringEncoder");
                                                
props.put("request.required.acks", "1");
                                                props.put("security.protocol", 
"PLAINTEXTSASL");
                                                props.put("producer.type", 
"async");
                                                
props.put("sasl.kerberos.service.name", "kafka");

                                                
//props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


                                                System.out.println("After 
config prop -1");

                                                ProducerConfig config = new 
ProducerConfig(props);

                                                System.out.println("After 
config prop -2 config" + config);

                                                Producer<String, String> 
producer = new Producer<String, String>(config);

                                                System.out.println("After 
config prop -3");

                                                for (long nEvents = 0L; nEvents 
< events; nEvents += 1L) {
                                                                Date runtime = 
new Date();
                                                                String ip = 
"192.168.2" + rnd.nextInt(255);
                                                                String msg = 
runtime + " www.example.com, " + ip;
                                                                KeyedMessage 
data = new KeyedMessage("test_march4", ip, msg);

                                                                
System.out.println("After config prop -1 data" + data);

                                                                
producer.send(data);
                                                }
                                                producer.close();

                                } catch (Throwable th) {
                                                th.printStackTrace();

                                }
                }
}


I have posted same question on stack overflow as well.
http://stackoverflow.com/questions/35934578/kafka-java-producer-with-kerberos

Please let me know if any one need more details.

Thanks,
------------------------------------------------
Kalpesh Jadhav
Sr. Software Engineer | Development
CitiusTech Inc.
www.citiustech.com<http://citiustech.com/>







Reply via email to