
I got that same error when following the configuration from Raja's
presentation earlier in this thread.  If you'll notice the usage for the, it is slightly different, which is also slightly
different than the scala code for the ConsoleProducer. :)

When I changed this:

bin/ --broker-list n5:9092:true --topic test

to this:

bin/ --broker-list n5:9092 --secure config/ --topic test

I was able to push messages to the topic, although I got a WARN about the
property "topic" not being valid, even though it is required.

Also, the Producer reported this warning to me:

[2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context

and the broker gave me this:
[2014-07-23 20:45:24,114] INFO begin ssl handshake for
[2014-07-23 20:45:24,374] INFO finished ssl handshake for
[2014-07-23 20:45:24,493] INFO Closing socket connection to (
[2014-07-23 20:45:24,555] INFO begin ssl handshake for
[2014-07-23 20:45:24,566] INFO finished ssl handshake for

It's like it did the SSL piece twice :)

Subsequent puts to the topic did not exhibit this behavior though:

root@n5[937]:~/kafka_2.10-0-8-2-> bin/
--broker-list n5:9092 --secure
config/ --topic test
[2014-07-23 20:45:17,530] WARN Property topic is not valid
[2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context

Consuming worked with these options:

root@n5[918]:~/kafka_2.10-0-8-2-> bin/
--topic test --zookeeper n5:2181 --from-beginning --security.config.file
^CConsumed 5 messages

I hope that helps!

On Tue, Jul 22, 2014 at 2:10 PM, Pramod Deshmukh <> wrote:

> Anyone getting this issue. Is it something related to environment or it is
> the code. Producer works fine when run with secure=false (no security)
> mode.
> pdeshmukh$ bin/ --broker-list localhost:9092:true
> --topic secureTopic
> [2014-07-18 13:12:29,817] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
> Hare Krishna
> [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
> Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
> at$class.readCompletely(Transmission.scala:56)
> at
> at
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh <>
> wrote:
> > Thanks Joe, I don't see any Out of memory error. Now I get exception when
> > Producer fetches metadata for a topic
> >
> > Here is how I created the topic and run producer
> >
> > pdeshmukh$ bin/ --create --zookeeper localhost:2181
> > --replication-factor 1 --partitions 1 --topic secureTopic
> > Created topic "secureTopic".
> >
> > pdeshmukh$ bin/ --list --zookeeper localhost:2181
> >
> > secure.test
> >
> > secureTopic
> >
> > >> Run producer, tried both localhost:9092:true and localhost:9092
> >
> > pdeshmukh$ bin/ --broker-list
> localhost:9092:true
> > --topic secureTopic
> >
> > [2014-07-18 13:12:29,817] WARN Property topic is not valid
> > (kafka.utils.VerifiableProperties)
> >
> > Hare Krishna
> >
> > [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation
> id
> > 0 for topics [Set(secureTopic)] from broker
> > [id:0,host:localhost,port:9092,secure:true] failed
> > (kafka.client.ClientUtils$)
> >
> > Received -1 when reading from channel, socket has
> > likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:381)
> >
> > at
> >
> >
> > at$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> > at
> >
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> >
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> >
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> >
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >
> > at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> >
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> >
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> >
> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> >
> > at
> >
> > [2014-07-18 13:12:45,258] ERROR fetching topic metadata for topics
> > [Set(secureTopic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed
> > (kafka.utils.Utils$)
> >
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(secureTopic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:9092,secure:true)] failed
> >
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> >
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >
> > at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> >
> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> >
> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> >
> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> >
> > at
> >
> > Caused by: Received -1 when reading from channel,
> > socket has likely been closed.
> >
> > at kafka.utils.Utils$.read(Utils.scala:381)
> >
> > at
> >
> >
> > at$class.readCompletely(Transmission.scala:56)
> >
> > at
> >
> >
> > at
> >
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> >
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> >
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> >
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >
> > ... 12 more
> > [2014-07-18 13:12:45,337] WARN Fetching topic metadata with correlation
> id
> > 1 for topics [Set(secureTopic)] from broker
> > [id:0,host:localhost,port:9092,secure:true] failed
> > (kafka.client.ClientUtils$)
> >
> > 2014-07-18 13:12:46,282] ERROR Failed to send requests for topics
> > secureTopic with correlation ids in [0,8]
> > (kafka.producer.async.DefaultEventHandler)
> >
> > [2014-07-18 13:12:46,283] ERROR Error in handling batch of 1 events
> > (kafka.producer.async.ProducerSendThread)
> >
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> >
> > at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> >
> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> >
> > at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> >
> > at
> >
> >
> >
> > On Fri, Jul 18, 2014 at 11:56 AM, Joe Stein <>
> wrote:
> >
> >> Hi Pramod,
> >>
> >> Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the
> >> to see if that gets you further along please
> in
> >> your testing?
> >>
> >> Thanks!
> >>
> >> /*******************************************
> >>  Joe Stein
> >>  Founder, Principal Consultant
> >>  Big Data Open Source Security LLC
> >>
> >>  Twitter: @allthingshadoop <>
> >> ********************************************/
> >>
> >>
> >> On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh <>
> >> wrote:
> >>
> >> > Hello Raja/Joe,
> >> > When I turn on security, i still get out of memory error on producer.
> Is
> >> > this something to do with keys? Is there any other way I can connect
> to
> >> > broker?
> >> >
> >> > *producer log*
> >> > [2014-07-17 15:38:14,186] ERROR OOME with size 352518400
> (
> >> > BoundedByteBufferReceive)
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >
> >> > *broker log*
> >> >
> >> > INFO begin ssl handshake for localhost/
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh <>
> >> > wrote:
> >> >
> >> > > Correct, I don't see any exceptions when i turn off security.
> >> Consumer is
> >> > > able to consume the message.
> >> > >
> >> > > I still see warning for topic property.
> >> > >
> >> > > [2014-07-17 18:04:38,360] WARN Property topic is not valid
> >> > > (kafka.utils.VerifiableProperties)
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango <
> >> >>
> >> > > wrote:
> >> > >
> >> > >> Can you try with turning off security to check if this error
> happens
> >> > only
> >> > >> on secure mode?
> >> > >>
> >> > >> Thanks,
> >> > >> Raja.
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh <
> >> >
> >> > >> wrote:
> >> > >>
> >> > >> > Thanks Raja, it was helpful
> >> > >> >
> >> > >> > Now I am able to start zookeeper and broker in secure mode ready
> >> for
> >> > SSL
> >> > >> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on
> >> > >> producer.
> >> > >> >
> >> > >> > I using the default configuration and keystore. Is there anything
> >> > >> missing
> >> > >> >
> >> > >> > *Start broker:*
> >> > >> >
> >> > >> > *bin/ config/*
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > *broker.log:*
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,281] INFO zookeeper state changed
> >> (SyncConnected)
> >> > >> > (org.I0Itec.zkclient.ZkClient)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
> >> > >> > (kafka.log.LogManager)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in
> >> log
> >> > >> > secure.test-0. (kafka.log.Log)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,571] INFO Completed load of log
> secure.test-0
> >> > with
> >> > >> log
> >> > >> > end offset 0 (kafka.log.Log)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period
> >> of
> >> > >> 60000
> >> > >> > ms. (kafka.log.LogManager)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a
> default
> >> > >> period
> >> > >> > of 9223372036854775807 ms. (kafka.log.LogManager)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,614] INFO Initializing secure authentication
> >> > >> > ($)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,678] INFO Secure authentication
> initialization
> >> > has
> >> > >> > been successfully completed ($)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on
> >> > >>
> >> > >> > .
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0],
> Started
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar
> >> is
> >> > >> not in
> >> > >> > the classpath (kafka.utils.Mx4jLoader$)
> >> > >> >
> >> > >> > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader
> >> > >> > (kafka.server.ZookeeperLeaderElector)
> >> > >> >
> >> > >> > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path
> >> > >> /brokers/ids/0
> >> > >> > with address (kafka.utils.ZkUtils$)
> >> > >> >
> >> > >> > [2014-07-17 15:34:47,059] INFO New leader is 0
> >> > >> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started
> >> > >> > (kafka.server.KafkaServer)*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for
> >> > >> > /
> >> > >> > <>
> >> > >> > (*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for
> >> > >> >
> >> > >> > <>
> >> > >> > (*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> >> > >> >
> >> > >> > <>
> >> > >> > (*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> >> > >> > /
> >> > >> > <>
> >> > >> > (*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker
> 0]
> >> > >> Removed
> >> > >> > fetcher for partitions  (kafka.server.ReplicaFetcherManager)*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker
> 0]
> >> > >> Added
> >> > >> > fetcher for partitions List()
> (kafka.server.ReplicaFetcherManager)*
> >> > >> >
> >> > >> > *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker
> 0]
> >> > >> Removed
> >> > >> > fetcher for partitions [secure.test,0]
> >> > >> > (kafka.server.ReplicaFetcherManager)*
> >> > >> >
> >> > >> > [2014-07-17 15:37:15,970] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:16,075] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:16,434] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:16,530] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:16,743] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:16,834] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:17,043] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:17,137] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> > [2014-07-17 15:37:17,342] INFO begin ssl handshake for
> >> > >> >
> >> > >> > (
> >> > >> >
> >> > >> >
> >> > >> > *Start producer*
> >> > >> >
> >> > >> > *bin/ --broker-list
> >> :true
> >> > >> > --topic
> >> > >> > secure.test*
> >> > >> >
> >> > >> >
> >> > >> > *producer.log:*
> >> > >> >
> >> > >> > bin/ --broker-list
> :true
> >> > >> --topic
> >> > >> > secure.test
> >> > >> >
> >> > >> > [2014-07-17 15:37:46,889] WARN Property topic is not valid
> >> > >> > (kafka.utils.VerifiableProperties)
> >> > >> >
> >> > >> > Hello Secure Kafka
> >> > >> >
> >> > >> > *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400
> >> > >> > (*
> >> > >> >
> >> > >> > *java.lang.OutOfMemoryError: Java heap space*
> >> > >> >
> >> > >> > at java.nio.HeapByteBuffer.<init>(
> >> > >> >
> >> > >> > at java.nio.ByteBuffer.allocate(
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >> > >> >
> >> > >> > at
> >>$class.readCompletely(Transmission.scala:56)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> > at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> >> > >> >
> >> > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> >> > >> >
> >> > >> > at
> >> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> >> > >> >
> >> > >> > at kafka.utils.Utils$.swallow(Utils.scala:172)
> >> > >> >
> >> > >> > at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >> > >> >
> >> > >> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> >> > >> >
> >> > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> >> > >> >
> >> > >> > at
> >> > >>
> >>
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango <
> >> > >>>
> >> > >> > wrote:
> >> > >> >
> >> > >> > > Pramod,
> >> > >> > >
> >> > >> > >
> >> > >> > > I presented secure kafka configuration and usage at last meet
> >> up. So
> >> > >> hope
> >> > >> > > this
> >> > >> > > video recording <>would
> >> > help.
> >> > >> You
> >> > >> > > can skip to about 59 min to jump to security talk.
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > > Raja.
> >> > >> > >
> >> > >> > >
> >> > >> > > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh <
> >> >>
> >> > >> > > wrote:
> >> > >> > >
> >> > >> > > > Hello Joe,
> >> > >> > > >
> >> > >> > > > Is there a configuration or example to test Kafka security
> >> piece?
> >> > >> > > >
> >> > >> > > > Thanks,
> >> > >> > > >
> >> > >> > > > Pramod
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh <
> >> > >>>
> >> > >> > > > wrote:
> >> > >> > > >
> >> > >> > > > > Thanks Joe,
> >> > >> > > > >
> >> > >> > > > > This branch works. I was able to proceed. I still had to
> set
> >> > scala
> >> > >> > > > version
> >> > >> > > > > to 2.9.2 in
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein <
> >> >>
> >> > >> > > wrote:
> >> > >> > > > >
> >> > >> > > > >> That is a very old branch.
> >> > >> > > > >>
> >> > >> > > > >> Here is a more up to date one
> >> > >> > > > >>
> >> > >> (needs to
> >> > >> > > be
> >> > >> > > > >> updated to latest trunk might have a chance to-do that
> next
> >> > >> week).
> >> > >> > > > >>
> >> > >> > > > >> You should be using gradle now as per the README.
> >> > >> > > > >>
> >> > >> > > > >> /*******************************************
> >> > >> > > > >>  Joe Stein
> >> > >> > > > >>  Founder, Principal Consultant
> >> > >> > > > >>  Big Data Open Source Security LLC
> >> > >> > > > >>
> >> > >> > > > >>  Twitter: @allthingshadoop <
> >> > >>>
> >> > >> > > > >> ********************************************/
> >> > >> > > > >>
> >> > >> > > > >>
> >> > >> > > > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh <
> >> > >> >>
> >> > >> > > > >> wrote:
> >> > >> > > > >>
> >> > >> > > > >> > Thanks Joe for this,
> >> > >> > > > >> >
> >> > >> > > > >> > I cloned this branch and tried to run zookeeper but I
> get
> >> > >> > > > >> >
> >> > >> > > > >> > Error: Could not find or load main class
> >> > >> > > > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain
> >> > >> > > > >> >
> >> > >> > > > >> >
> >> > >> > > > >> > I see scala version is still set to 2.8.0
> >> > >> > > > >> >
> >> > >> > > > >> > if [ -z "$SCALA_VERSION" ]; then
> >> > >> > > > >> >
> >> > >> > > > >> >         SCALA_VERSION=2.8.0
> >> > >> > > > >> >
> >> > >> > > > >> > fi
> >> > >> > > > >> >
> >> > >> > > > >> >
> >> > >> > > > >> >
> >> > >> > > > >> > Then I installed sbt and scala and followed your
> >> instructions
> >> > >> for
> >> > >> > > > >> different
> >> > >> > > > >> > scala versions. I was able to bring zookeeper up but
> >> brokers
> >> > >> fail
> >> > >> > to
> >> > >> > > > >> start
> >> > >> > > > >> > with error
> >> > >> > > > >> >
> >> > >> > > > >> > Error: Could not find or load main class kafka.Kafka
> >> > >> > > > >> >
> >> > >> > > > >> > I think I am doing something wrong. Can you please help
> >> me?
> >> > >> > > > >> >
> >> > >> > > > >> > Our current production setup is with 2.8.0 and want to
> >> stick
> >> > to
> >> > >> > it.
> >> > >> > > > >> >
> >> > >> > > > >> > Thanks,
> >> > >> > > > >> >
> >> > >> > > > >> > Pramod
> >> > >> > > > >> >
> >> > >> > > > >> >
> >> > >> > > > >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein <
> >> > >>>
> >> > >> > > > wrote:
> >> > >> > > > >> >
> >> > >> > > > >> > > Hi,I wanted to re-ignite the discussion around Apache
> >> Kafka
> >> > >> > > > Security.
> >> > >> > > > >> >  This
> >> > >> > > > >> > > is a huge bottleneck (non-starter in some cases) for a
> >> lot
> >> > of
> >> > >> > > > >> > organizations
> >> > >> > > > >> > > (due to regulatory, compliance and other
> requirements).
> >> > Below
> >> > >> > are
> >> > >> > > my
> >> > >> > > > >> > > suggestions for specific changes in Kafka to
> accommodate
> >> > >> > security
> >> > >> > > > >> > > requirements.  This comes from what folks are doing
> "in
> >> the
> >> > >> > wild"
> >> > >> > > to
> >> > >> > > > >> > > workaround and implement security with Kafka as it is
> >> today
> >> > >> and
> >> > >> > > also
> >> > >> > > > >> > what I
> >> > >> > > > >> > > have discovered from organizations about their
> >> blockers. It
> >> > >> also
> >> > >> > > > >> picks up
> >> > >> > > > >> > > from the wiki (which I should have time to update
> later
> >> in
> >> > >> the
> >> > >> > > week
> >> > >> > > > >> based
> >> > >> > > > >> > > on the below and feedback from the thread).
> >> > >> > > > >> > >
> >> > >> > > > >> > > 1) Transport Layer Security (i.e. SSL)
> >> > >> > > > >> > >
> >> > >> > > > >> > > This also includes client authentication in addition
> to
> >> > >> > in-transit
> >> > >> > > > >> > security
> >> > >> > > > >> > > layer.  This work has been picked up here
> >> > >> > > > >> > > and
> do
> >> > >> > > appreciate
> >> > >> > > > >> any
> >> > >> > > > >> > > thoughts, comments, feedback, tomatoes, whatever for
> >> this
> >> > >> patch.
> >> > >> > >  It
> >> > >> > > > >> is a
> >> > >> > > > >> > > pickup from the fork of the work first done here
> >> > >> > > > >> > >
> >> > >> > > > >> > >
> >> > >> > > > >> > > 2) Data encryption at rest.
> >> > >> > > > >> > >
> >> > >> > > > >> > > This is very important and something that can be
> >> > facilitated
> >> > >> > > within
> >> > >> > > > >> the
> >> > >> > > > >> > > wire protocol. It requires an additional map data
> >> structure
> >> > >> for
> >> > >> > > the
> >> > >> > > > >> > > "encrypted [data encryption key]". With this map
> >> (either in
> >> > >> your
> >> > >> > > > >> object
> >> > >> > > > >> > or
> >> > >> > > > >> > > in the wire protocol) you can store the dynamically
> >> > generated
> >> > >> > > > >> symmetric
> >> > >> > > > >> > key
> >> > >> > > > >> > > (for each message) and then encrypt the data using
> that
> >> > >> > > dynamically
> >> > >> > > > >> > > generated key.  You then encrypt the encryption key
> >> using
> >> > >> each
> >> > >> > > > public
> >> > >> > > > >> key
> >> > >> > > > >> > > for whom is expected to be able to decrypt the
> >> encryption
> >> > >> key to
> >> > >> > > > then
> >> > >> > > > >> > > decrypt the message.  For each public key encrypted
> >> > symmetric
> >> > >> > key
> >> > >> > > > >> (which
> >> > >> > > > >> > is
> >> > >> > > > >> > > now the "encrypted [data encryption key]" along with
> >> which
> >> > >> > public
> >> > >> > > > key
> >> > >> > > > >> it
> >> > >> > > > >> > > was encrypted with for (so a map of [publicKey] =
> >> > >> > > > >> > > encryptedDataEncryptionKey) as a chain.   Other
> patterns
> >> > can
> >> > >> be
> >> > >> > > > >> > implemented
> >> > >> > > > >> > > but this is a pretty standard digital enveloping [0]
> >> > pattern
> >> > >> > with
> >> > >> > > > >> only 1
> >> > >> > > > >> > > field added. Other patterns should be able to use that
> >> > field
> >> > >> > to-do
> >> > >> > > > >> their
> >> > >> > > > >> > > implementation too.
> >> > >> > > > >> > >
> >> > >> > > > >> > > 3) Non-repudiation and long term non-repudiation.
> >> > >> > > > >> > >
> >> > >> > > > >> > > Non-repudiation is proving data hasn't changed.  This
> is
> >> > >> often
> >> > >> > (if
> >> > >> > > > not
> >> > >> > > > >> > > always) done with x509 public certificates (chained
> to a
> >> > >> > > certificate
> >> > >> > > > >> > > authority).
> >> > >> > > > >> > >
> >> > >> > > > >> > > Long term non-repudiation is what happens when the
> >> > >> certificates
> >> > >> > of
> >> > >> > > > the
> >> > >> > > > >> > > certificate authority are expired (or revoked) and
> >> > everything
> >> > >> > ever
> >> > >> > > > >> signed
> >> > >> > > > >> > > (ever) with that certificate's public key then becomes
> >> "no
> >> > >> > longer
> >> > >> > > > >> > provable
> >> > >> > > > >> > > as ever being authentic".  That is where RFC3126 [1]
> and
> >> > >> RFC3161
> >> > >> > > [2]
> >> > >> > > > >> come
> >> > >> > > > >> > > in (or worm drives [hardware], etc).
> >> > >> > > > >> > >
> >> > >> > > > >> > > For either (or both) of these it is an operation of
> the
> >> > >> > encryptor
> >> > >> > > to
> >> > >> > > > >> > > sign/hash the data (with or without third party
> trusted
> >> > >> timestap
> >> > >> > > of
> >> > >> > > > >> the
> >> > >> > > > >> > > signing event) and encrypt that with their own private
> >> key
> >> > >> and
> >> > >> > > > >> distribute
> >> > >> > > > >> > > the results (before and after encrypting if required)
> >> along
> >> > >> with
> >> > >> > > > their
> >> > >> > > > >> > > public key. This structure is a bit more complex but
> >> > >> feasible,
> >> > >> > it
> >> > >> > > > is a
> >> > >> > > > >> > map
> >> > >> > > > >> > > of digital signature formats and the chain of dig sig
> >> > >> > > attestations.
> >> > >> > > > >>  The
> >> > >> > > > >> > > map's key being the method (i.e. CRC32, PKCS7 [3],
> >> > XmlDigSig
> >> > >> > [4])
> >> > >> > > > and
> >> > >> > > > >> > then
> >> > >> > > > >> > > a list of map where that key is "purpose" of signature
> >> > (what
> >> > >> > your
> >> > >> > > > >> > attesting
> >> > >> > > > >> > > too).  As a sibling field to the list another field
> for
> >> > "the
> >> > >> > > > >> attester" as
> >> > >> > > > >> > > bytes (e.g. their PKCS12 [5] for the map of PKCS7
> >> > >> signatures).
> >> > >> > > > >> > >
> >> > >> > > > >> > > 4) Authorization
> >> > >> > > > >> > >
> >> > >> > > > >> > > We should have a policy of "404" for data, topics,
> >> > partitions
> >> > >> > > (etc)
> >> > >> > > > if
> >> > >> > > > >> > > authenticated connections do not have access.  In
> >> "secure
> >> > >> mode"
> >> > >> > > any
> >> > >> > > > >> non
> >> > >> > > > >> > > authenticated connections should get a "404" type
> >> message
> >> > on
> >> > >> > > > >> everything.
> >> > >> > > > >> > > Knowing "something is there" is a security risk in
> many
> >> > uses
> >> > >> > > cases.
> >> > >> > > > >>  So
> >> > >> > > > >> > if
> >> > >> > > > >> > > you don't have access you don't even see it.  Baking
> >> "that"
> >> > >> into
> >> > >> > > > Kafka
> >> > >> > > > >> > > along with some interface for entitlement (access
> >> > management)
> >> > >> > > > systems
> >> > >> > > > >> > > (pretty standard) is all that I think needs to be done
> >> to
> >> > the
> >> > >> > core
> >> > >> > > > >> > project.
> >> > >> > > > >> > >  I want to tackle item later in the year after summer
> >> after
> >> > >> the
> >> > >> > > > other
> >> > >> > > > >> > three
> >> > >> > > > >> > > are complete.
> >> > >> > > > >> > >
> >> > >> > > > >> > > I look forward to thoughts on this and anyone else
> >> > >> interested in
> >> > >> > > > >> working
> >> > >> > > > >> > > with us on these items.
> >> > >> > > > >> > >
> >> > >> > > > >> > > [0]
> >> > >> > > > >> > >
> >> > >> > > > >> > >
> >> > >> > > > >> >
> >> > >> > > > >>
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> >
> >>
> >> > >> > > > >> > > [1]
> >> > >> > > > >> > > [2]
> >> > >> > > > >> > > [3]
> >> > >> > > > >> > >
> >> > >> > > > >> > >
> >> > >> > > > >> >
> >> > >> > > > >>
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> >
> >>
> >> > >> > > > >> > > [4]
> >> > >> > > > >> > > [5]
> >> > >> > > > >> > >
> >> > >> > > > >> > > /*******************************************
> >> > >> > > > >> > >  Joe Stein
> >> > >> > > > >> > >  Founder, Principal Consultant
> >> > >> > > > >> > >  Big Data Open Source Security LLC
> >> > >> > > > >> > >
> >> > >> > > > >> > >  Twitter: @allthingshadoop <
> >> > >> > >>
> >> > >> > > > >> > > ********************************************/
> >> > >> > > > >> > >
> >> > >> > > > >> >
> >> > >> > > > >>
> >> > >> > > > >
> >> > >> > > > >
> >> > >> > > >
> >> > >> > >
> >> > >> > >
> >> > >> > >
> >> > >> > > --
> >> > >> > > Thanks,
> >> > >> > > Raja.
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >>
> >> > >>
> >> > >> --
> >> > >> Thanks,
> >> > >> Raja.
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >

Reply via email to