Anyone getting this issue. Is it something related to environment or it is the code. Producer works fine when run with secure=false (no security) mode.
pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true --topic secureTopic [2014-07-18 13:12:29,817] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Hare Krishna [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id 0 for topics [Set(secureTopic)] from broker [id:0,host:localhost,port:9092,secure:true] failed (kafka.client.ClientUtils$) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) at kafka.producer.SyncProducer.send(SyncProducer.scala:117) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:526) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh <dpram...@gmail.com> wrote: > Thanks Joe, I don't see any Out of memory error. Now I get exception when > Producer fetches metadata for a topic > > Here is how I created the topic and run producer > > pdeshmukh$ bin/kafka-topics.sh --create --zookeeper localhost:2181 > --replication-factor 1 --partitions 1 --topic secureTopic > Created topic "secureTopic". > > pdeshmukh$ bin/kafka-topics.sh --list --zookeeper localhost:2181 > > secure.test > > secureTopic > > >> Run producer, tried both localhost:9092:true and localhost:9092 > > pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true > --topic secureTopic > > [2014-07-18 13:12:29,817] WARN Property topic is not valid > (kafka.utils.VerifiableProperties) > > Hare Krishna > > [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id > 0 for topics [Set(secureTopic)] from broker > [id:0,host:localhost,port:9092,secure:true] failed > (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) > > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:117) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:526) > > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > [2014-07-18 13:12:45,258] ERROR fetching topic metadata for topics > [Set(secureTopic)] from broker > [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed > (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > [Set(secureTopic)] from broker > [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:526) > > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > Caused by: java.io.EOFException: Received -1 when reading from channel, > socket has likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:381) > > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) > > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:117) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > ... 12 more > [2014-07-18 13:12:45,337] WARN Fetching topic metadata with correlation id > 1 for topics [Set(secureTopic)] from broker > [id:0,host:localhost,port:9092,secure:true] failed > (kafka.client.ClientUtils$) > > 2014-07-18 13:12:46,282] ERROR Failed to send requests for topics > secureTopic with correlation ids in [0,8] > (kafka.producer.async.DefaultEventHandler) > > [2014-07-18 13:12:46,283] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:526) > > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > > > On Fri, Jul 18, 2014 at 11:56 AM, Joe Stein <joe.st...@stealth.ly> wrote: > >> Hi Pramod, >> >> Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the >> kafka-console-producer.sh to see if that gets you further along please in >> your testing? >> >> Thanks! >> >> /******************************************* >> Joe Stein >> Founder, Principal Consultant >> Big Data Open Source Security LLC >> http://www.stealth.ly >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> ********************************************/ >> >> >> On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh <dpram...@gmail.com> >> wrote: >> >> > Hello Raja/Joe, >> > When I turn on security, i still get out of memory error on producer. Is >> > this something to do with keys? Is there any other way I can connect to >> > broker? >> > >> > *producer log* >> > [2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network. >> > BoundedByteBufferReceive) >> > java.lang.OutOfMemoryError: Java heap space >> > >> > *broker log* >> > >> > INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092 >> > >> > >> > >> > >> > >> > On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh <dpram...@gmail.com> >> > wrote: >> > >> > > Correct, I don't see any exceptions when i turn off security. >> Consumer is >> > > able to consume the message. >> > > >> > > I still see warning for topic property. >> > > >> > > [2014-07-17 18:04:38,360] WARN Property topic is not valid >> > > (kafka.utils.VerifiableProperties) >> > > >> > > >> > > >> > > >> > > >> > > On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango < >> > rela...@salesforce.com> >> > > wrote: >> > > >> > >> Can you try with turning off security to check if this error happens >> > only >> > >> on secure mode? >> > >> >> > >> Thanks, >> > >> Raja. >> > >> >> > >> >> > >> >> > >> >> > >> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh <dpram...@gmail.com >> > >> > >> wrote: >> > >> >> > >> > Thanks Raja, it was helpful >> > >> > >> > >> > Now I am able to start zookeeper and broker in secure mode ready >> for >> > SSL >> > >> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on >> > >> producer. >> > >> > >> > >> > I using the default configuration and keystore. Is there anything >> > >> missing >> > >> > >> > >> > *Start broker:* >> > >> > >> > >> > *bin/kafka-server-start.sh config/server.properties* >> > >> > >> > >> > >> > >> > >> > >> > *broker.log:* >> > >> > >> > >> > [2014-07-17 15:34:46,281] INFO zookeeper state changed >> (SyncConnected) >> > >> > (org.I0Itec.zkclient.ZkClient) >> > >> > >> > >> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0' >> > >> > (kafka.log.LogManager) >> > >> > >> > >> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in >> log >> > >> > secure.test-0. (kafka.log.Log) >> > >> > >> > >> > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 >> > with >> > >> log >> > >> > end offset 0 (kafka.log.Log) >> > >> > >> > >> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period >> of >> > >> 60000 >> > >> > ms. (kafka.log.LogManager) >> > >> > >> > >> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default >> > >> period >> > >> > of 9223372036854775807 ms. (kafka.log.LogManager) >> > >> > >> > >> > [2014-07-17 15:34:46,614] INFO Initializing secure authentication >> > >> > (kafka.network.security.SecureAuth$) >> > >> > >> > >> > [2014-07-17 15:34:46,678] INFO Secure authentication initialization >> > has >> > >> > been successfully completed (kafka.network.security.SecureAuth$) >> > >> > >> > >> > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on >> > >> 0.0.0.0:9092 >> > >> > . >> > >> > (kafka.network.Acceptor) >> > >> > >> > >> > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started >> > >> > (kafka.network.SocketServer) >> > >> > >> > >> > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar >> is >> > >> not in >> > >> > the classpath (kafka.utils.Mx4jLoader$) >> > >> > >> > >> > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader >> > >> > (kafka.server.ZookeeperLeaderElector) >> > >> > >> > >> > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path >> > >> /brokers/ids/0 >> > >> > with address 10.1.100.130:9092. (kafka.utils.ZkUtils$) >> > >> > >> > >> > [2014-07-17 15:34:47,059] INFO New leader is 0 >> > >> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) >> > >> > >> > >> > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started >> > >> > (kafka.server.KafkaServer)* >> > >> > >> > >> > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for >> > >> > /10.1.100.130:9092//10.1.100.130:51685 >> > >> > <http://10.1.100.130:9092//10.1.100.130:51685> >> > >> > (kafka.network.security.SSLSocketChannel)* >> > >> > >> > >> > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 >> > >> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092> >> > >> > (kafka.network.security.SSLSocketChannel)* >> > >> > >> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 >> > >> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092> >> > >> > (kafka.network.security.SSLSocketChannel)* >> > >> > >> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for >> > >> > /10.1.100.130:9092//10.1.100.130:51685 >> > >> > <http://10.1.100.130:9092//10.1.100.130:51685> >> > >> > (kafka.network.security.SSLSocketChannel)* >> > >> > >> > >> > *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0] >> > >> Removed >> > >> > fetcher for partitions (kafka.server.ReplicaFetcherManager)* >> > >> > >> > >> > *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] >> > >> Added >> > >> > fetcher for partitions List() (kafka.server.ReplicaFetcherManager)* >> > >> > >> > >> > *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0] >> > >> Removed >> > >> > fetcher for partitions [secure.test,0] >> > >> > (kafka.server.ReplicaFetcherManager)* >> > >> > >> > >> > [2014-07-17 15:37:15,970] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:16,075] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51690//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:16,434] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51691//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:16,530] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51692//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:16,743] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51693//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:16,834] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51694//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:17,043] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51695//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:17,137] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51696//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > [2014-07-17 15:37:17,342] INFO begin ssl handshake for >> > >> > 10.1.100.130/10.1.100.130:51697//10.1.100.130:9092 >> > >> > (kafka.network.security.SSLSocketChannel) >> > >> > >> > >> > >> > >> > *Start producer* >> > >> > >> > >> > *bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092 >> :true >> > >> > --topic >> > >> > secure.test* >> > >> > >> > >> > >> > >> > *producer.log:* >> > >> > >> > >> > bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true >> > >> --topic >> > >> > secure.test >> > >> > >> > >> > [2014-07-17 15:37:46,889] WARN Property topic is not valid >> > >> > (kafka.utils.VerifiableProperties) >> > >> > >> > >> > Hello Secure Kafka >> > >> > >> > >> > *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400 >> > >> > (kafka.network.BoundedByteBufferReceive)* >> > >> > >> > >> > *java.lang.OutOfMemoryError: Java heap space* >> > >> > >> > >> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >> > >> > >> > >> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) >> > >> > >> > >> > at >> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >> > >> > >> > >> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) >> > >> > >> > >> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) >> > >> > >> > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117) >> > >> > >> > >> > at >> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >> > >> > >> > >> > at >> > >> > >> > >> >> > >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) >> > >> > >> > >> > at kafka.utils.Utils$.swallow(Utils.scala:172) >> > >> > >> > >> > at kafka.utils.Logging$class.swallowError(Logging.scala:106) >> > >> > >> > >> > at kafka.utils.Utils$.swallowError(Utils.scala:45) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) >> > >> > >> > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:526) >> > >> > >> > >> > at >> > >> > >> > >> > >> > >> >> > >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) >> > >> > >> > >> > at >> > >> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) >> > >> > >> > >> > >> > >> > >> > >> > On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango < >> > >> rela...@salesforce.com> >> > >> > wrote: >> > >> > >> > >> > > Pramod, >> > >> > > >> > >> > > >> > >> > > I presented secure kafka configuration and usage at last meet >> up. So >> > >> hope >> > >> > > this >> > >> > > video recording <http://www.ustream.tv/recorded/48396701>would >> > help. >> > >> You >> > >> > > can skip to about 59 min to jump to security talk. >> > >> > > >> > >> > > Thanks, >> > >> > > Raja. >> > >> > > >> > >> > > >> > >> > > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh < >> > dpram...@gmail.com> >> > >> > > wrote: >> > >> > > >> > >> > > > Hello Joe, >> > >> > > > >> > >> > > > Is there a configuration or example to test Kafka security >> piece? >> > >> > > > >> > >> > > > Thanks, >> > >> > > > >> > >> > > > Pramod >> > >> > > > >> > >> > > > >> > >> > > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh < >> > >> dpram...@gmail.com> >> > >> > > > wrote: >> > >> > > > >> > >> > > > > Thanks Joe, >> > >> > > > > >> > >> > > > > This branch works. I was able to proceed. I still had to set >> > scala >> > >> > > > version >> > >> > > > > to 2.9.2 in kafka-run-class.sh. >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein < >> > joe.st...@stealth.ly> >> > >> > > wrote: >> > >> > > > > >> > >> > > > >> That is a very old branch. >> > >> > > > >> >> > >> > > > >> Here is a more up to date one >> > >> > > > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 >> > >> (needs to >> > >> > > be >> > >> > > > >> updated to latest trunk might have a chance to-do that next >> > >> week). >> > >> > > > >> >> > >> > > > >> You should be using gradle now as per the README. >> > >> > > > >> >> > >> > > > >> /******************************************* >> > >> > > > >> Joe Stein >> > >> > > > >> Founder, Principal Consultant >> > >> > > > >> Big Data Open Source Security LLC >> > >> > > > >> http://www.stealth.ly >> > >> > > > >> Twitter: @allthingshadoop < >> > >> http://www.twitter.com/allthingshadoop> >> > >> > > > >> ********************************************/ >> > >> > > > >> >> > >> > > > >> >> > >> > > > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh < >> > >> > dpram...@gmail.com> >> > >> > > > >> wrote: >> > >> > > > >> >> > >> > > > >> > Thanks Joe for this, >> > >> > > > >> > >> > >> > > > >> > I cloned this branch and tried to run zookeeper but I get >> > >> > > > >> > >> > >> > > > >> > Error: Could not find or load main class >> > >> > > > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain >> > >> > > > >> > >> > >> > > > >> > >> > >> > > > >> > I see scala version is still set to 2.8.0 >> > >> > > > >> > >> > >> > > > >> > if [ -z "$SCALA_VERSION" ]; then >> > >> > > > >> > >> > >> > > > >> > SCALA_VERSION=2.8.0 >> > >> > > > >> > >> > >> > > > >> > fi >> > >> > > > >> > >> > >> > > > >> > >> > >> > > > >> > >> > >> > > > >> > Then I installed sbt and scala and followed your >> instructions >> > >> for >> > >> > > > >> different >> > >> > > > >> > scala versions. I was able to bring zookeeper up but >> brokers >> > >> fail >> > >> > to >> > >> > > > >> start >> > >> > > > >> > with error >> > >> > > > >> > >> > >> > > > >> > Error: Could not find or load main class kafka.Kafka >> > >> > > > >> > >> > >> > > > >> > I think I am doing something wrong. Can you please help >> me? >> > >> > > > >> > >> > >> > > > >> > Our current production setup is with 2.8.0 and want to >> stick >> > to >> > >> > it. >> > >> > > > >> > >> > >> > > > >> > Thanks, >> > >> > > > >> > >> > >> > > > >> > Pramod >> > >> > > > >> > >> > >> > > > >> > >> > >> > > > >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein < >> > >> joe.st...@stealth.ly> >> > >> > > > wrote: >> > >> > > > >> > >> > >> > > > >> > > Hi,I wanted to re-ignite the discussion around Apache >> Kafka >> > >> > > > Security. >> > >> > > > >> > This >> > >> > > > >> > > is a huge bottleneck (non-starter in some cases) for a >> lot >> > of >> > >> > > > >> > organizations >> > >> > > > >> > > (due to regulatory, compliance and other requirements). >> > Below >> > >> > are >> > >> > > my >> > >> > > > >> > > suggestions for specific changes in Kafka to accommodate >> > >> > security >> > >> > > > >> > > requirements. This comes from what folks are doing "in >> the >> > >> > wild" >> > >> > > to >> > >> > > > >> > > workaround and implement security with Kafka as it is >> today >> > >> and >> > >> > > also >> > >> > > > >> > what I >> > >> > > > >> > > have discovered from organizations about their >> blockers. It >> > >> also >> > >> > > > >> picks up >> > >> > > > >> > > from the wiki (which I should have time to update later >> in >> > >> the >> > >> > > week >> > >> > > > >> based >> > >> > > > >> > > on the below and feedback from the thread). >> > >> > > > >> > > >> > >> > > > >> > > 1) Transport Layer Security (i.e. SSL) >> > >> > > > >> > > >> > >> > > > >> > > This also includes client authentication in addition to >> > >> > in-transit >> > >> > > > >> > security >> > >> > > > >> > > layer. This work has been picked up here >> > >> > > > >> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do >> > >> > > appreciate >> > >> > > > >> any >> > >> > > > >> > > thoughts, comments, feedback, tomatoes, whatever for >> this >> > >> patch. >> > >> > > It >> > >> > > > >> is a >> > >> > > > >> > > pickup from the fork of the work first done here >> > >> > > > >> > > https://github.com/relango/kafka/tree/kafka_security. >> > >> > > > >> > > >> > >> > > > >> > > 2) Data encryption at rest. >> > >> > > > >> > > >> > >> > > > >> > > This is very important and something that can be >> > facilitated >> > >> > > within >> > >> > > > >> the >> > >> > > > >> > > wire protocol. It requires an additional map data >> structure >> > >> for >> > >> > > the >> > >> > > > >> > > "encrypted [data encryption key]". With this map >> (either in >> > >> your >> > >> > > > >> object >> > >> > > > >> > or >> > >> > > > >> > > in the wire protocol) you can store the dynamically >> > generated >> > >> > > > >> symmetric >> > >> > > > >> > key >> > >> > > > >> > > (for each message) and then encrypt the data using that >> > >> > > dynamically >> > >> > > > >> > > generated key. You then encrypt the encryption key >> using >> > >> each >> > >> > > > public >> > >> > > > >> key >> > >> > > > >> > > for whom is expected to be able to decrypt the >> encryption >> > >> key to >> > >> > > > then >> > >> > > > >> > > decrypt the message. For each public key encrypted >> > symmetric >> > >> > key >> > >> > > > >> (which >> > >> > > > >> > is >> > >> > > > >> > > now the "encrypted [data encryption key]" along with >> which >> > >> > public >> > >> > > > key >> > >> > > > >> it >> > >> > > > >> > > was encrypted with for (so a map of [publicKey] = >> > >> > > > >> > > encryptedDataEncryptionKey) as a chain. Other patterns >> > can >> > >> be >> > >> > > > >> > implemented >> > >> > > > >> > > but this is a pretty standard digital enveloping [0] >> > pattern >> > >> > with >> > >> > > > >> only 1 >> > >> > > > >> > > field added. Other patterns should be able to use that >> > field >> > >> > to-do >> > >> > > > >> their >> > >> > > > >> > > implementation too. >> > >> > > > >> > > >> > >> > > > >> > > 3) Non-repudiation and long term non-repudiation. >> > >> > > > >> > > >> > >> > > > >> > > Non-repudiation is proving data hasn't changed. This is >> > >> often >> > >> > (if >> > >> > > > not >> > >> > > > >> > > always) done with x509 public certificates (chained to a >> > >> > > certificate >> > >> > > > >> > > authority). >> > >> > > > >> > > >> > >> > > > >> > > Long term non-repudiation is what happens when the >> > >> certificates >> > >> > of >> > >> > > > the >> > >> > > > >> > > certificate authority are expired (or revoked) and >> > everything >> > >> > ever >> > >> > > > >> signed >> > >> > > > >> > > (ever) with that certificate's public key then becomes >> "no >> > >> > longer >> > >> > > > >> > provable >> > >> > > > >> > > as ever being authentic". That is where RFC3126 [1] and >> > >> RFC3161 >> > >> > > [2] >> > >> > > > >> come >> > >> > > > >> > > in (or worm drives [hardware], etc). >> > >> > > > >> > > >> > >> > > > >> > > For either (or both) of these it is an operation of the >> > >> > encryptor >> > >> > > to >> > >> > > > >> > > sign/hash the data (with or without third party trusted >> > >> timestap >> > >> > > of >> > >> > > > >> the >> > >> > > > >> > > signing event) and encrypt that with their own private >> key >> > >> and >> > >> > > > >> distribute >> > >> > > > >> > > the results (before and after encrypting if required) >> along >> > >> with >> > >> > > > their >> > >> > > > >> > > public key. This structure is a bit more complex but >> > >> feasible, >> > >> > it >> > >> > > > is a >> > >> > > > >> > map >> > >> > > > >> > > of digital signature formats and the chain of dig sig >> > >> > > attestations. >> > >> > > > >> The >> > >> > > > >> > > map's key being the method (i.e. CRC32, PKCS7 [3], >> > XmlDigSig >> > >> > [4]) >> > >> > > > and >> > >> > > > >> > then >> > >> > > > >> > > a list of map where that key is "purpose" of signature >> > (what >> > >> > your >> > >> > > > >> > attesting >> > >> > > > >> > > too). As a sibling field to the list another field for >> > "the >> > >> > > > >> attester" as >> > >> > > > >> > > bytes (e.g. their PKCS12 [5] for the map of PKCS7 >> > >> signatures). >> > >> > > > >> > > >> > >> > > > >> > > 4) Authorization >> > >> > > > >> > > >> > >> > > > >> > > We should have a policy of "404" for data, topics, >> > partitions >> > >> > > (etc) >> > >> > > > if >> > >> > > > >> > > authenticated connections do not have access. In >> "secure >> > >> mode" >> > >> > > any >> > >> > > > >> non >> > >> > > > >> > > authenticated connections should get a "404" type >> message >> > on >> > >> > > > >> everything. >> > >> > > > >> > > Knowing "something is there" is a security risk in many >> > uses >> > >> > > cases. >> > >> > > > >> So >> > >> > > > >> > if >> > >> > > > >> > > you don't have access you don't even see it. Baking >> "that" >> > >> into >> > >> > > > Kafka >> > >> > > > >> > > along with some interface for entitlement (access >> > management) >> > >> > > > systems >> > >> > > > >> > > (pretty standard) is all that I think needs to be done >> to >> > the >> > >> > core >> > >> > > > >> > project. >> > >> > > > >> > > I want to tackle item later in the year after summer >> after >> > >> the >> > >> > > > other >> > >> > > > >> > three >> > >> > > > >> > > are complete. >> > >> > > > >> > > >> > >> > > > >> > > I look forward to thoughts on this and anyone else >> > >> interested in >> > >> > > > >> working >> > >> > > > >> > > with us on these items. >> > >> > > > >> > > >> > >> > > > >> > > [0] >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > >> > >> > >> > > > >> >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/what-is-a-digital-envelope.htm >> > >> > > > >> > > [1] http://tools.ietf.org/html/rfc3126 >> > >> > > > >> > > [2] http://tools.ietf.org/html/rfc3161 >> > >> > > > >> > > [3] >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > >> > >> > >> > > > >> >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/pkcs-7-cryptographic-message-syntax-standar.htm >> > >> > > > >> > > [4] http://en.wikipedia.org/wiki/XML_Signature >> > >> > > > >> > > [5] http://en.wikipedia.org/wiki/PKCS_12 >> > >> > > > >> > > >> > >> > > > >> > > /******************************************* >> > >> > > > >> > > Joe Stein >> > >> > > > >> > > Founder, Principal Consultant >> > >> > > > >> > > Big Data Open Source Security LLC >> > >> > > > >> > > http://www.stealth.ly >> > >> > > > >> > > Twitter: @allthingshadoop < >> > >> > > http://www.twitter.com/allthingshadoop> >> > >> > > > >> > > ********************************************/ >> > >> > > > >> > > >> > >> > > > >> > >> > >> > > > >> >> > >> > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > -- >> > >> > > Thanks, >> > >> > > Raja. >> > >> > > >> > >> > >> > >> >> > >> >> > >> >> > >> -- >> > >> Thanks, >> > >> Raja. >> > >> >> > > >> > > >> > >> > >