Anyone getting this issue. Is it something related to environment or it is
the code. Producer works fine when run with secure=false (no security) mode.


pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
--topic secureTopic

[2014-07-18 13:12:29,817] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)

Hare Krishna

[2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
0 for topics [Set(secureTopic)] from broker
[id:0,host:localhost,port:9092,secure:true] failed
(kafka.client.ClientUtils$)

java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

at kafka.utils.Utils$.read(Utils.scala:381)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)

at kafka.producer.SyncProducer.send(SyncProducer.scala:117)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)

at kafka.utils.Utils$.swallow(Utils.scala:172)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.Utils$.swallowError(Utils.scala:45)

at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)

at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:526)

at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)


On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh <dpram...@gmail.com> wrote:

> Thanks Joe, I don't see any Out of memory error. Now I get exception when
> Producer fetches metadata for a topic
>
> Here is how I created the topic and run producer
>
> pdeshmukh$ bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic secureTopic
> Created topic "secureTopic".
>
> pdeshmukh$ bin/kafka-topics.sh --list --zookeeper localhost:2181
>
> secure.test
>
> secureTopic
>
> >> Run producer, tried both localhost:9092:true and localhost:9092
>
> pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
> --topic secureTopic
>
> [2014-07-18 13:12:29,817] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hare Krishna
>
> [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> [2014-07-18 13:12:45,258] ERROR fetching topic metadata for topics
> [Set(secureTopic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed
> (kafka.utils.Utils$)
>
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(secureTopic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
> Caused by: java.io.EOFException: Received -1 when reading from channel,
> socket has likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> ... 12 more
> [2014-07-18 13:12:45,337] WARN Fetching topic metadata with correlation id
> 1 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
>
> 2014-07-18 13:12:46,282] ERROR Failed to send requests for topics
> secureTopic with correlation ids in [0,8]
> (kafka.producer.async.DefaultEventHandler)
>
> [2014-07-18 13:12:46,283] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
>
>
> On Fri, Jul 18, 2014 at 11:56 AM, Joe Stein <joe.st...@stealth.ly> wrote:
>
>> Hi Pramod,
>>
>> Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the
>> kafka-console-producer.sh to see if that gets you further along please in
>> your testing?
>>
>> Thanks!
>>
>> /*******************************************
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> ********************************************/
>>
>>
>> On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh <dpram...@gmail.com>
>> wrote:
>>
>> > Hello Raja/Joe,
>> > When I turn on security, i still get out of memory error on producer. Is
>> > this something to do with keys? Is there any other way I can connect to
>> > broker?
>> >
>> > *producer log*
>> > [2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network.
>> > BoundedByteBufferReceive)
>> > java.lang.OutOfMemoryError: Java heap space
>> >
>> > *broker log*
>> >
>> > INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh <dpram...@gmail.com>
>> > wrote:
>> >
>> > > Correct, I don't see any exceptions when i turn off security.
>> Consumer is
>> > > able to consume the message.
>> > >
>> > > I still see warning for topic property.
>> > >
>> > > [2014-07-17 18:04:38,360] WARN Property topic is not valid
>> > > (kafka.utils.VerifiableProperties)
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango <
>> > rela...@salesforce.com>
>> > > wrote:
>> > >
>> > >> Can you try with turning off security to check if this error happens
>> > only
>> > >> on secure mode?
>> > >>
>> > >> Thanks,
>> > >> Raja.
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh <dpram...@gmail.com
>> >
>> > >> wrote:
>> > >>
>> > >> > Thanks Raja, it was helpful
>> > >> >
>> > >> > Now I am able to start zookeeper and broker in secure mode ready
>> for
>> > SSL
>> > >> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on
>> > >> producer.
>> > >> >
>> > >> > I using the default configuration and keystore. Is there anything
>> > >> missing
>> > >> >
>> > >> > *Start broker:*
>> > >> >
>> > >> > *bin/kafka-server-start.sh config/server.properties*
>> > >> >
>> > >> >
>> > >> >
>> > >> > *broker.log:*
>> > >> >
>> > >> > [2014-07-17 15:34:46,281] INFO zookeeper state changed
>> (SyncConnected)
>> > >> > (org.I0Itec.zkclient.ZkClient)
>> > >> >
>> > >> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
>> > >> > (kafka.log.LogManager)
>> > >> >
>> > >> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in
>> log
>> > >> > secure.test-0. (kafka.log.Log)
>> > >> >
>> > >> > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0
>> > with
>> > >> log
>> > >> > end offset 0 (kafka.log.Log)
>> > >> >
>> > >> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period
>> of
>> > >> 60000
>> > >> > ms. (kafka.log.LogManager)
>> > >> >
>> > >> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default
>> > >> period
>> > >> > of 9223372036854775807 ms. (kafka.log.LogManager)
>> > >> >
>> > >> > [2014-07-17 15:34:46,614] INFO Initializing secure authentication
>> > >> > (kafka.network.security.SecureAuth$)
>> > >> >
>> > >> > [2014-07-17 15:34:46,678] INFO Secure authentication initialization
>> > has
>> > >> > been successfully completed (kafka.network.security.SecureAuth$)
>> > >> >
>> > >> > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on
>> > >> 0.0.0.0:9092
>> > >> > .
>> > >> > (kafka.network.Acceptor)
>> > >> >
>> > >> > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started
>> > >> > (kafka.network.SocketServer)
>> > >> >
>> > >> > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar
>> is
>> > >> not in
>> > >> > the classpath (kafka.utils.Mx4jLoader$)
>> > >> >
>> > >> > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader
>> > >> > (kafka.server.ZookeeperLeaderElector)
>> > >> >
>> > >> > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path
>> > >> /brokers/ids/0
>> > >> > with address 10.1.100.130:9092. (kafka.utils.ZkUtils$)
>> > >> >
>> > >> > [2014-07-17 15:34:47,059] INFO New leader is 0
>> > >> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
>> > >> >
>> > >> > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started
>> > >> > (kafka.server.KafkaServer)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for
>> > >> > /10.1.100.130:9092//10.1.100.130:51685
>> > >> > <http://10.1.100.130:9092//10.1.100.130:51685>
>> > >> > (kafka.network.security.SSLSocketChannel)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
>> > >> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
>> > >> > (kafka.network.security.SSLSocketChannel)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
>> > >> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
>> > >> > (kafka.network.security.SSLSocketChannel)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
>> > >> > /10.1.100.130:9092//10.1.100.130:51685
>> > >> > <http://10.1.100.130:9092//10.1.100.130:51685>
>> > >> > (kafka.network.security.SSLSocketChannel)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0]
>> > >> Removed
>> > >> > fetcher for partitions  (kafka.server.ReplicaFetcherManager)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0]
>> > >> Added
>> > >> > fetcher for partitions List() (kafka.server.ReplicaFetcherManager)*
>> > >> >
>> > >> > *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0]
>> > >> Removed
>> > >> > fetcher for partitions [secure.test,0]
>> > >> > (kafka.server.ReplicaFetcherManager)*
>> > >> >
>> > >> > [2014-07-17 15:37:15,970] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:16,075] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51690//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:16,434] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51691//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:16,530] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51692//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:16,743] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51693//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:16,834] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51694//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:17,043] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51695//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:17,137] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51696//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> > [2014-07-17 15:37:17,342] INFO begin ssl handshake for
>> > >> > 10.1.100.130/10.1.100.130:51697//10.1.100.130:9092
>> > >> > (kafka.network.security.SSLSocketChannel)
>> > >> >
>> > >> >
>> > >> > *Start producer*
>> > >> >
>> > >> > *bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092
>> :true
>> > >> > --topic
>> > >> > secure.test*
>> > >> >
>> > >> >
>> > >> > *producer.log:*
>> > >> >
>> > >> > bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true
>> > >> --topic
>> > >> > secure.test
>> > >> >
>> > >> > [2014-07-17 15:37:46,889] WARN Property topic is not valid
>> > >> > (kafka.utils.VerifiableProperties)
>> > >> >
>> > >> > Hello Secure Kafka
>> > >> >
>> > >> > *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400
>> > >> > (kafka.network.BoundedByteBufferReceive)*
>> > >> >
>> > >> > *java.lang.OutOfMemoryError: Java heap space*
>> > >> >
>> > >> > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>> > >> >
>> > >> > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>> > >> >
>> > >> > at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> > >> >
>> > >> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>> > >> >
>> > >> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>> > >> >
>> > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>> > >> >
>> > >> > at
>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>> > >> >
>> > >> > at
>> > >> >
>> > >>
>> >
>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>> > >> >
>> > >> > at kafka.utils.Utils$.swallow(Utils.scala:172)
>> > >> >
>> > >> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>> > >> >
>> > >> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>> > >> >
>> > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>> > >> >
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>> > >> >
>> > >> > at
>> > >>
>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango <
>> > >> rela...@salesforce.com>
>> > >> > wrote:
>> > >> >
>> > >> > > Pramod,
>> > >> > >
>> > >> > >
>> > >> > > I presented secure kafka configuration and usage at last meet
>> up. So
>> > >> hope
>> > >> > > this
>> > >> > > video recording <http://www.ustream.tv/recorded/48396701>would
>> > help.
>> > >> You
>> > >> > > can skip to about 59 min to jump to security talk.
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Raja.
>> > >> > >
>> > >> > >
>> > >> > > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh <
>> > dpram...@gmail.com>
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Hello Joe,
>> > >> > > >
>> > >> > > > Is there a configuration or example to test Kafka security
>> piece?
>> > >> > > >
>> > >> > > > Thanks,
>> > >> > > >
>> > >> > > > Pramod
>> > >> > > >
>> > >> > > >
>> > >> > > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh <
>> > >> dpram...@gmail.com>
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > > > Thanks Joe,
>> > >> > > > >
>> > >> > > > > This branch works. I was able to proceed. I still had to set
>> > scala
>> > >> > > > version
>> > >> > > > > to 2.9.2 in kafka-run-class.sh.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein <
>> > joe.st...@stealth.ly>
>> > >> > > wrote:
>> > >> > > > >
>> > >> > > > >> That is a very old branch.
>> > >> > > > >>
>> > >> > > > >> Here is a more up to date one
>> > >> > > > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477
>> > >> (needs to
>> > >> > > be
>> > >> > > > >> updated to latest trunk might have a chance to-do that next
>> > >> week).
>> > >> > > > >>
>> > >> > > > >> You should be using gradle now as per the README.
>> > >> > > > >>
>> > >> > > > >> /*******************************************
>> > >> > > > >>  Joe Stein
>> > >> > > > >>  Founder, Principal Consultant
>> > >> > > > >>  Big Data Open Source Security LLC
>> > >> > > > >>  http://www.stealth.ly
>> > >> > > > >>  Twitter: @allthingshadoop <
>> > >> http://www.twitter.com/allthingshadoop>
>> > >> > > > >> ********************************************/
>> > >> > > > >>
>> > >> > > > >>
>> > >> > > > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh <
>> > >> > dpram...@gmail.com>
>> > >> > > > >> wrote:
>> > >> > > > >>
>> > >> > > > >> > Thanks Joe for this,
>> > >> > > > >> >
>> > >> > > > >> > I cloned this branch and tried to run zookeeper but I get
>> > >> > > > >> >
>> > >> > > > >> > Error: Could not find or load main class
>> > >> > > > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> > I see scala version is still set to 2.8.0
>> > >> > > > >> >
>> > >> > > > >> > if [ -z "$SCALA_VERSION" ]; then
>> > >> > > > >> >
>> > >> > > > >> >         SCALA_VERSION=2.8.0
>> > >> > > > >> >
>> > >> > > > >> > fi
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> > Then I installed sbt and scala and followed your
>> instructions
>> > >> for
>> > >> > > > >> different
>> > >> > > > >> > scala versions. I was able to bring zookeeper up but
>> brokers
>> > >> fail
>> > >> > to
>> > >> > > > >> start
>> > >> > > > >> > with error
>> > >> > > > >> >
>> > >> > > > >> > Error: Could not find or load main class kafka.Kafka
>> > >> > > > >> >
>> > >> > > > >> > I think I am doing something wrong. Can you please help
>> me?
>> > >> > > > >> >
>> > >> > > > >> > Our current production setup is with 2.8.0 and want to
>> stick
>> > to
>> > >> > it.
>> > >> > > > >> >
>> > >> > > > >> > Thanks,
>> > >> > > > >> >
>> > >> > > > >> > Pramod
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein <
>> > >> joe.st...@stealth.ly>
>> > >> > > > wrote:
>> > >> > > > >> >
>> > >> > > > >> > > Hi,I wanted to re-ignite the discussion around Apache
>> Kafka
>> > >> > > > Security.
>> > >> > > > >> >  This
>> > >> > > > >> > > is a huge bottleneck (non-starter in some cases) for a
>> lot
>> > of
>> > >> > > > >> > organizations
>> > >> > > > >> > > (due to regulatory, compliance and other requirements).
>> > Below
>> > >> > are
>> > >> > > my
>> > >> > > > >> > > suggestions for specific changes in Kafka to accommodate
>> > >> > security
>> > >> > > > >> > > requirements.  This comes from what folks are doing "in
>> the
>> > >> > wild"
>> > >> > > to
>> > >> > > > >> > > workaround and implement security with Kafka as it is
>> today
>> > >> and
>> > >> > > also
>> > >> > > > >> > what I
>> > >> > > > >> > > have discovered from organizations about their
>> blockers. It
>> > >> also
>> > >> > > > >> picks up
>> > >> > > > >> > > from the wiki (which I should have time to update later
>> in
>> > >> the
>> > >> > > week
>> > >> > > > >> based
>> > >> > > > >> > > on the below and feedback from the thread).
>> > >> > > > >> > >
>> > >> > > > >> > > 1) Transport Layer Security (i.e. SSL)
>> > >> > > > >> > >
>> > >> > > > >> > > This also includes client authentication in addition to
>> > >> > in-transit
>> > >> > > > >> > security
>> > >> > > > >> > > layer.  This work has been picked up here
>> > >> > > > >> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do
>> > >> > > appreciate
>> > >> > > > >> any
>> > >> > > > >> > > thoughts, comments, feedback, tomatoes, whatever for
>> this
>> > >> patch.
>> > >> > >  It
>> > >> > > > >> is a
>> > >> > > > >> > > pickup from the fork of the work first done here
>> > >> > > > >> > > https://github.com/relango/kafka/tree/kafka_security.
>> > >> > > > >> > >
>> > >> > > > >> > > 2) Data encryption at rest.
>> > >> > > > >> > >
>> > >> > > > >> > > This is very important and something that can be
>> > facilitated
>> > >> > > within
>> > >> > > > >> the
>> > >> > > > >> > > wire protocol. It requires an additional map data
>> structure
>> > >> for
>> > >> > > the
>> > >> > > > >> > > "encrypted [data encryption key]". With this map
>> (either in
>> > >> your
>> > >> > > > >> object
>> > >> > > > >> > or
>> > >> > > > >> > > in the wire protocol) you can store the dynamically
>> > generated
>> > >> > > > >> symmetric
>> > >> > > > >> > key
>> > >> > > > >> > > (for each message) and then encrypt the data using that
>> > >> > > dynamically
>> > >> > > > >> > > generated key.  You then encrypt the encryption key
>> using
>> > >> each
>> > >> > > > public
>> > >> > > > >> key
>> > >> > > > >> > > for whom is expected to be able to decrypt the
>> encryption
>> > >> key to
>> > >> > > > then
>> > >> > > > >> > > decrypt the message.  For each public key encrypted
>> > symmetric
>> > >> > key
>> > >> > > > >> (which
>> > >> > > > >> > is
>> > >> > > > >> > > now the "encrypted [data encryption key]" along with
>> which
>> > >> > public
>> > >> > > > key
>> > >> > > > >> it
>> > >> > > > >> > > was encrypted with for (so a map of [publicKey] =
>> > >> > > > >> > > encryptedDataEncryptionKey) as a chain.   Other patterns
>> > can
>> > >> be
>> > >> > > > >> > implemented
>> > >> > > > >> > > but this is a pretty standard digital enveloping [0]
>> > pattern
>> > >> > with
>> > >> > > > >> only 1
>> > >> > > > >> > > field added. Other patterns should be able to use that
>> > field
>> > >> > to-do
>> > >> > > > >> their
>> > >> > > > >> > > implementation too.
>> > >> > > > >> > >
>> > >> > > > >> > > 3) Non-repudiation and long term non-repudiation.
>> > >> > > > >> > >
>> > >> > > > >> > > Non-repudiation is proving data hasn't changed.  This is
>> > >> often
>> > >> > (if
>> > >> > > > not
>> > >> > > > >> > > always) done with x509 public certificates (chained to a
>> > >> > > certificate
>> > >> > > > >> > > authority).
>> > >> > > > >> > >
>> > >> > > > >> > > Long term non-repudiation is what happens when the
>> > >> certificates
>> > >> > of
>> > >> > > > the
>> > >> > > > >> > > certificate authority are expired (or revoked) and
>> > everything
>> > >> > ever
>> > >> > > > >> signed
>> > >> > > > >> > > (ever) with that certificate's public key then becomes
>> "no
>> > >> > longer
>> > >> > > > >> > provable
>> > >> > > > >> > > as ever being authentic".  That is where RFC3126 [1] and
>> > >> RFC3161
>> > >> > > [2]
>> > >> > > > >> come
>> > >> > > > >> > > in (or worm drives [hardware], etc).
>> > >> > > > >> > >
>> > >> > > > >> > > For either (or both) of these it is an operation of the
>> > >> > encryptor
>> > >> > > to
>> > >> > > > >> > > sign/hash the data (with or without third party trusted
>> > >> timestap
>> > >> > > of
>> > >> > > > >> the
>> > >> > > > >> > > signing event) and encrypt that with their own private
>> key
>> > >> and
>> > >> > > > >> distribute
>> > >> > > > >> > > the results (before and after encrypting if required)
>> along
>> > >> with
>> > >> > > > their
>> > >> > > > >> > > public key. This structure is a bit more complex but
>> > >> feasible,
>> > >> > it
>> > >> > > > is a
>> > >> > > > >> > map
>> > >> > > > >> > > of digital signature formats and the chain of dig sig
>> > >> > > attestations.
>> > >> > > > >>  The
>> > >> > > > >> > > map's key being the method (i.e. CRC32, PKCS7 [3],
>> > XmlDigSig
>> > >> > [4])
>> > >> > > > and
>> > >> > > > >> > then
>> > >> > > > >> > > a list of map where that key is "purpose" of signature
>> > (what
>> > >> > your
>> > >> > > > >> > attesting
>> > >> > > > >> > > too).  As a sibling field to the list another field for
>> > "the
>> > >> > > > >> attester" as
>> > >> > > > >> > > bytes (e.g. their PKCS12 [5] for the map of PKCS7
>> > >> signatures).
>> > >> > > > >> > >
>> > >> > > > >> > > 4) Authorization
>> > >> > > > >> > >
>> > >> > > > >> > > We should have a policy of "404" for data, topics,
>> > partitions
>> > >> > > (etc)
>> > >> > > > if
>> > >> > > > >> > > authenticated connections do not have access.  In
>> "secure
>> > >> mode"
>> > >> > > any
>> > >> > > > >> non
>> > >> > > > >> > > authenticated connections should get a "404" type
>> message
>> > on
>> > >> > > > >> everything.
>> > >> > > > >> > > Knowing "something is there" is a security risk in many
>> > uses
>> > >> > > cases.
>> > >> > > > >>  So
>> > >> > > > >> > if
>> > >> > > > >> > > you don't have access you don't even see it.  Baking
>> "that"
>> > >> into
>> > >> > > > Kafka
>> > >> > > > >> > > along with some interface for entitlement (access
>> > management)
>> > >> > > > systems
>> > >> > > > >> > > (pretty standard) is all that I think needs to be done
>> to
>> > the
>> > >> > core
>> > >> > > > >> > project.
>> > >> > > > >> > >  I want to tackle item later in the year after summer
>> after
>> > >> the
>> > >> > > > other
>> > >> > > > >> > three
>> > >> > > > >> > > are complete.
>> > >> > > > >> > >
>> > >> > > > >> > > I look forward to thoughts on this and anyone else
>> > >> interested in
>> > >> > > > >> working
>> > >> > > > >> > > with us on these items.
>> > >> > > > >> > >
>> > >> > > > >> > > [0]
>> > >> > > > >> > >
>> > >> > > > >> > >
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/what-is-a-digital-envelope.htm
>> > >> > > > >> > > [1] http://tools.ietf.org/html/rfc3126
>> > >> > > > >> > > [2] http://tools.ietf.org/html/rfc3161
>> > >> > > > >> > > [3]
>> > >> > > > >> > >
>> > >> > > > >> > >
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/pkcs-7-cryptographic-message-syntax-standar.htm
>> > >> > > > >> > > [4] http://en.wikipedia.org/wiki/XML_Signature
>> > >> > > > >> > > [5] http://en.wikipedia.org/wiki/PKCS_12
>> > >> > > > >> > >
>> > >> > > > >> > > /*******************************************
>> > >> > > > >> > >  Joe Stein
>> > >> > > > >> > >  Founder, Principal Consultant
>> > >> > > > >> > >  Big Data Open Source Security LLC
>> > >> > > > >> > >  http://www.stealth.ly
>> > >> > > > >> > >  Twitter: @allthingshadoop <
>> > >> > > http://www.twitter.com/allthingshadoop>
>> > >> > > > >> > > ********************************************/
>> > >> > > > >> > >
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > --
>> > >> > > Thanks,
>> > >> > > Raja.
>> > >> > >
>> > >> >
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> Thanks,
>> > >> Raja.
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to