Re: [DISCUSS] Kafka Security Specific Features

2014-07-16 Thread Pramod Deshmukh
Thanks Joe,

This branch works. I was able to proceed. I still had to set scala version
to 2.9.2 in kafka-run-class.sh.



On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein  wrote:

> That is a very old branch.
>
> Here is a more up to date one
> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to be
> updated to latest trunk might have a chance to-do that next week).
>
> You should be using gradle now as per the README.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********/
>
>
> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh 
> wrote:
>
> > Thanks Joe for this,
> >
> > I cloned this branch and tried to run zookeeper but I get
> >
> > Error: Could not find or load main class
> > org.apache.zookeeper.server.quorum.QuorumPeerMain
> >
> >
> > I see scala version is still set to 2.8.0
> >
> > if [ -z "$SCALA_VERSION" ]; then
> >
> > SCALA_VERSION=2.8.0
> >
> > fi
> >
> >
> >
> > Then I installed sbt and scala and followed your instructions for
> different
> > scala versions. I was able to bring zookeeper up but brokers fail to
> start
> > with error
> >
> > Error: Could not find or load main class kafka.Kafka
> >
> > I think I am doing something wrong. Can you please help me?
> >
> > Our current production setup is with 2.8.0 and want to stick to it.
> >
> > Thanks,
> >
> > Pramod
> >
> >
> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein  wrote:
> >
> > > Hi,I wanted to re-ignite the discussion around Apache Kafka Security.
> >  This
> > > is a huge bottleneck (non-starter in some cases) for a lot of
> > organizations
> > > (due to regulatory, compliance and other requirements). Below are my
> > > suggestions for specific changes in Kafka to accommodate security
> > > requirements.  This comes from what folks are doing "in the wild" to
> > > workaround and implement security with Kafka as it is today and also
> > what I
> > > have discovered from organizations about their blockers. It also picks
> up
> > > from the wiki (which I should have time to update later in the week
> based
> > > on the below and feedback from the thread).
> > >
> > > 1) Transport Layer Security (i.e. SSL)
> > >
> > > This also includes client authentication in addition to in-transit
> > security
> > > layer.  This work has been picked up here
> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any
> > > thoughts, comments, feedback, tomatoes, whatever for this patch.  It
> is a
> > > pickup from the fork of the work first done here
> > > https://github.com/relango/kafka/tree/kafka_security.
> > >
> > > 2) Data encryption at rest.
> > >
> > > This is very important and something that can be facilitated within the
> > > wire protocol. It requires an additional map data structure for the
> > > "encrypted [data encryption key]". With this map (either in your object
> > or
> > > in the wire protocol) you can store the dynamically generated symmetric
> > key
> > > (for each message) and then encrypt the data using that dynamically
> > > generated key.  You then encrypt the encryption key using each public
> key
> > > for whom is expected to be able to decrypt the encryption key to then
> > > decrypt the message.  For each public key encrypted symmetric key
> (which
> > is
> > > now the "encrypted [data encryption key]" along with which public key
> it
> > > was encrypted with for (so a map of [publicKey] =
> > > encryptedDataEncryptionKey) as a chain.   Other patterns can be
> > implemented
> > > but this is a pretty standard digital enveloping [0] pattern with only
> 1
> > > field added. Other patterns should be able to use that field to-do
> their
> > > implementation too.
> > >
> > > 3) Non-repudiation and long term non-repudiation.
> > >
> > > Non-repudiation is proving data hasn't changed.  This is often (if not
> > > always) done with x509 public certificates (chained to a certificate
> > > authority).
> > >
> > > Long term non-repudiation is what happens when the certificates of the
> > > certificate 

Re: [DISCUSS] Kafka Security Specific Features

2014-07-16 Thread Pramod Deshmukh
Hello Joe,

Is there a configuration or example to test Kafka security piece?

Thanks,

Pramod


On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh  wrote:

> Thanks Joe,
>
> This branch works. I was able to proceed. I still had to set scala version
> to 2.9.2 in kafka-run-class.sh.
>
>
>
> On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein  wrote:
>
>> That is a very old branch.
>>
>> Here is a more up to date one
>> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to be
>> updated to latest trunk might have a chance to-do that next week).
>>
>> You should be using gradle now as per the README.
>>
>> /***
>>  Joe Stein
>>  Founder, Principal Consultant
>>  Big Data Open Source Security LLC
>>  http://www.stealth.ly
>>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> /
>>
>>
>> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh 
>> wrote:
>>
>> > Thanks Joe for this,
>> >
>> > I cloned this branch and tried to run zookeeper but I get
>> >
>> > Error: Could not find or load main class
>> > org.apache.zookeeper.server.quorum.QuorumPeerMain
>> >
>> >
>> > I see scala version is still set to 2.8.0
>> >
>> > if [ -z "$SCALA_VERSION" ]; then
>> >
>> > SCALA_VERSION=2.8.0
>> >
>> > fi
>> >
>> >
>> >
>> > Then I installed sbt and scala and followed your instructions for
>> different
>> > scala versions. I was able to bring zookeeper up but brokers fail to
>> start
>> > with error
>> >
>> > Error: Could not find or load main class kafka.Kafka
>> >
>> > I think I am doing something wrong. Can you please help me?
>> >
>> > Our current production setup is with 2.8.0 and want to stick to it.
>> >
>> > Thanks,
>> >
>> > Pramod
>> >
>> >
>> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein  wrote:
>> >
>> > > Hi,I wanted to re-ignite the discussion around Apache Kafka Security.
>> >  This
>> > > is a huge bottleneck (non-starter in some cases) for a lot of
>> > organizations
>> > > (due to regulatory, compliance and other requirements). Below are my
>> > > suggestions for specific changes in Kafka to accommodate security
>> > > requirements.  This comes from what folks are doing "in the wild" to
>> > > workaround and implement security with Kafka as it is today and also
>> > what I
>> > > have discovered from organizations about their blockers. It also
>> picks up
>> > > from the wiki (which I should have time to update later in the week
>> based
>> > > on the below and feedback from the thread).
>> > >
>> > > 1) Transport Layer Security (i.e. SSL)
>> > >
>> > > This also includes client authentication in addition to in-transit
>> > security
>> > > layer.  This work has been picked up here
>> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate
>> any
>> > > thoughts, comments, feedback, tomatoes, whatever for this patch.  It
>> is a
>> > > pickup from the fork of the work first done here
>> > > https://github.com/relango/kafka/tree/kafka_security.
>> > >
>> > > 2) Data encryption at rest.
>> > >
>> > > This is very important and something that can be facilitated within
>> the
>> > > wire protocol. It requires an additional map data structure for the
>> > > "encrypted [data encryption key]". With this map (either in your
>> object
>> > or
>> > > in the wire protocol) you can store the dynamically generated
>> symmetric
>> > key
>> > > (for each message) and then encrypt the data using that dynamically
>> > > generated key.  You then encrypt the encryption key using each public
>> key
>> > > for whom is expected to be able to decrypt the encryption key to then
>> > > decrypt the message.  For each public key encrypted symmetric key
>> (which
>> > is
>> > > now the "encrypted [data encryption key]" along with which public key
>> it
>> > > was encrypted with for (so a map of [publicKey] =
>> > > encryptedDataEncryptionKey) as a chain.   Other patterns can be
>> > implemented
>

Re: [DISCUSS] Kafka Security Specific Features

2014-07-16 Thread Pramod Deshmukh
Thanks Joe for this,

I cloned this branch and tried to run zookeeper but I get

Error: Could not find or load main class
org.apache.zookeeper.server.quorum.QuorumPeerMain


I see scala version is still set to 2.8.0

if [ -z "$SCALA_VERSION" ]; then

SCALA_VERSION=2.8.0

fi



Then I installed sbt and scala and followed your instructions for different
scala versions. I was able to bring zookeeper up but brokers fail to start
with error

Error: Could not find or load main class kafka.Kafka

I think I am doing something wrong. Can you please help me?

Our current production setup is with 2.8.0 and want to stick to it.

Thanks,

Pramod


On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein  wrote:

> Hi,I wanted to re-ignite the discussion around Apache Kafka Security.  This
> is a huge bottleneck (non-starter in some cases) for a lot of organizations
> (due to regulatory, compliance and other requirements). Below are my
> suggestions for specific changes in Kafka to accommodate security
> requirements.  This comes from what folks are doing "in the wild" to
> workaround and implement security with Kafka as it is today and also what I
> have discovered from organizations about their blockers. It also picks up
> from the wiki (which I should have time to update later in the week based
> on the below and feedback from the thread).
>
> 1) Transport Layer Security (i.e. SSL)
>
> This also includes client authentication in addition to in-transit security
> layer.  This work has been picked up here
> https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any
> thoughts, comments, feedback, tomatoes, whatever for this patch.  It is a
> pickup from the fork of the work first done here
> https://github.com/relango/kafka/tree/kafka_security.
>
> 2) Data encryption at rest.
>
> This is very important and something that can be facilitated within the
> wire protocol. It requires an additional map data structure for the
> "encrypted [data encryption key]". With this map (either in your object or
> in the wire protocol) you can store the dynamically generated symmetric key
> (for each message) and then encrypt the data using that dynamically
> generated key.  You then encrypt the encryption key using each public key
> for whom is expected to be able to decrypt the encryption key to then
> decrypt the message.  For each public key encrypted symmetric key (which is
> now the "encrypted [data encryption key]" along with which public key it
> was encrypted with for (so a map of [publicKey] =
> encryptedDataEncryptionKey) as a chain.   Other patterns can be implemented
> but this is a pretty standard digital enveloping [0] pattern with only 1
> field added. Other patterns should be able to use that field to-do their
> implementation too.
>
> 3) Non-repudiation and long term non-repudiation.
>
> Non-repudiation is proving data hasn't changed.  This is often (if not
> always) done with x509 public certificates (chained to a certificate
> authority).
>
> Long term non-repudiation is what happens when the certificates of the
> certificate authority are expired (or revoked) and everything ever signed
> (ever) with that certificate's public key then becomes "no longer provable
> as ever being authentic".  That is where RFC3126 [1] and RFC3161 [2] come
> in (or worm drives [hardware], etc).
>
> For either (or both) of these it is an operation of the encryptor to
> sign/hash the data (with or without third party trusted timestap of the
> signing event) and encrypt that with their own private key and distribute
> the results (before and after encrypting if required) along with their
> public key. This structure is a bit more complex but feasible, it is a map
> of digital signature formats and the chain of dig sig attestations.  The
> map's key being the method (i.e. CRC32, PKCS7 [3], XmlDigSig [4]) and then
> a list of map where that key is "purpose" of signature (what your attesting
> too).  As a sibling field to the list another field for "the attester" as
> bytes (e.g. their PKCS12 [5] for the map of PKCS7 signatures).
>
> 4) Authorization
>
> We should have a policy of "404" for data, topics, partitions (etc) if
> authenticated connections do not have access.  In "secure mode" any non
> authenticated connections should get a "404" type message on everything.
> Knowing "something is there" is a security risk in many uses cases.  So if
> you don't have access you don't even see it.  Baking "that" into Kafka
> along with some interface for entitlement (access management) systems
> (pretty standard) is all that I think needs to be done to the core project.
>  I want to tackle item later in the year after summer after the other three
> are complete.
>
> I look forward to thoughts on this and anyone else interested in working
> with us on these items.
>
> [0]
>
> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/what-is-a-digital-envelope.htm
> [1] http://tools.ietf.org/html/rfc3126
> [2] http://tools.ietf.org/html/rfc3161

Re: [DISCUSS] Kafka Security Specific Features

2014-07-17 Thread Pramod Deshmukh
a.lang.OutOfMemoryError: Java heap space*

at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)

at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)

at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)

at kafka.producer.SyncProducer.send(SyncProducer.scala:117)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)

at kafka.utils.Utils$.swallow(Utils.scala:172)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.Utils$.swallowError(Utils.scala:45)

at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)

at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:526)

at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)



On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango 
wrote:

> Pramod,
>
>
> I presented secure kafka configuration and usage at last meet up. So hope
> this
> video recording <http://www.ustream.tv/recorded/48396701>would help. You
> can skip to about 59 min to jump to security talk.
>
> Thanks,
> Raja.
>
>
> On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh 
> wrote:
>
> > Hello Joe,
> >
> > Is there a configuration or example to test Kafka security piece?
> >
> > Thanks,
> >
> > Pramod
> >
> >
> > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh 
> > wrote:
> >
> > > Thanks Joe,
> > >
> > > This branch works. I was able to proceed. I still had to set scala
> > version
> > > to 2.9.2 in kafka-run-class.sh.
> > >
> > >
> > >
> > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein 
> wrote:
> > >
> > >> That is a very old branch.
> > >>
> > >> Here is a more up to date one
> > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to
> be
> > >> updated to latest trunk might have a chance to-do that next week).
> > >>
> > >> You should be using gradle now as per the README.
> > >>
> > >> /***
> > >>  Joe Stein
> > >>  Founder, Principal Consultant
> > >>  Big Data Open Source Security LLC
> > >>  http://www.stealth.ly
> > >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > >> /
> > >>
> > >>
> > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh 
> > >> wrote:
> > >>
> > >> > Thanks Joe for this,
> > >> >
> > >> > I cloned this branch and tried to run zookeeper but I get
> > >> >
> > >> > Error: Could not find or load main class
> > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain
> > >> >
> > >> >
> > >> > I see scala version is still set to 2.8.0
> > >> >
> > >> > if [ -z "$SCALA_VERSION" ]; then
> > >> >
> > >> > SCALA_VERSION=2.8.0
> > >> >
> > >> > fi
> > >> >
> > >> >
> > >> >
> > >> > Then I installed sbt and scala and followed your instructions for
> > >> different
> > >> > scala versions. I was able to bring zookeeper up but brokers fail to
> > >> start
> > >> > with error
> > >> >
> > >> > Error: Could not find or load main class kafka.Kafka
> > >> >
> > >> > I think I am doing something wrong. Can you please help me?
> > >> >
> 

Re: [DISCUSS] Kafka Security Specific Features

2014-07-17 Thread Pramod Deshmukh
Correct, I don't see any exceptions when i turn off security. Consumer is
able to consume the message.

I still see warning for topic property.

[2014-07-17 18:04:38,360] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)





On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango 
wrote:

> Can you try with turning off security to check if this error happens only
> on secure mode?
>
> Thanks,
> Raja.
>
>
>
>
> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh 
> wrote:
>
> > Thanks Raja, it was helpful
> >
> > Now I am able to start zookeeper and broker in secure mode ready for SSL
> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on
> producer.
> >
> > I using the default configuration and keystore. Is there anything missing
> >
> > *Start broker:*
> >
> > *bin/kafka-server-start.sh config/server.properties*
> >
> >
> >
> > *broker.log:*
> >
> > [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> >
> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
> > (kafka.log.LogManager)
> >
> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log
> > secure.test-0. (kafka.log.Log)
> >
> > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with
> log
> > end offset 0 (kafka.log.Log)
> >
> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of
> 6
> > ms. (kafka.log.LogManager)
> >
> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default period
> > of 9223372036854775807 ms. (kafka.log.LogManager)
> >
> > [2014-07-17 15:34:46,614] INFO Initializing secure authentication
> > (kafka.network.security.SecureAuth$)
> >
> > [2014-07-17 15:34:46,678] INFO Secure authentication initialization has
> > been successfully completed (kafka.network.security.SecureAuth$)
> >
> > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on
> 0.0.0.0:9092
> > .
> > (kafka.network.Acceptor)
> >
> > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started
> > (kafka.network.SocketServer)
> >
> > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is not
> in
> > the classpath (kafka.utils.Mx4jLoader$)
> >
> > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader
> > (kafka.server.ZookeeperLeaderElector)
> >
> > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path /brokers/ids/0
> > with address 10.1.100.130:9092. (kafka.utils.ZkUtils$)
> >
> > [2014-07-17 15:34:47,059] INFO New leader is 0
> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> >
> > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started
> > (kafka.server.KafkaServer)*
> >
> > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for
> > /10.1.100.130:9092//10.1.100.130:51685
> > <http://10.1.100.130:9092//10.1.100.130:51685>
> > (kafka.network.security.SSLSocketChannel)*
> >
> > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for
> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
> > (kafka.network.security.SSLSocketChannel)*
> >
> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
> > (kafka.network.security.SSLSocketChannel)*
> >
> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> > /10.1.100.130:9092//10.1.100.130:51685
> > <http://10.1.100.130:9092//10.1.100.130:51685>
> > (kafka.network.security.SSLSocketChannel)*
> >
> > *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0]
> Removed
> > fetcher for partitions  (kafka.server.ReplicaFetcherManager)*
> >
> > *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] Added
> > fetcher for partitions List() (kafka.server.ReplicaFetcherManager)*
> >
> > *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0]
> Removed
> > fetcher for partitions [secure.test,0]
> > (kafka.server.ReplicaFetcherManager)*
> >
> > [2014-07-17 15:37:15,970] INFO begin ssl handshake for
> > 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092
> > (kafka.network.security.SSLSocketChannel)
> >
> > [2014-07-17 15:37:16,075] INFO begin ssl handshake for
> > 10.1.100.130/10.1.100.130:51

Re: [DISCUSS] Kafka Security Specific Features

2014-07-18 Thread Pramod Deshmukh
Hello Raja/Joe,
When I turn on security, i still get out of memory error on producer. Is
this something to do with keys? Is there any other way I can connect to
broker?

*producer log*
[2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network.
BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space

*broker log*

INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092





On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh  wrote:

> Correct, I don't see any exceptions when i turn off security. Consumer is
> able to consume the message.
>
> I still see warning for topic property.
>
> [2014-07-17 18:04:38,360] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
>
>
>
>
> On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango 
> wrote:
>
>> Can you try with turning off security to check if this error happens only
>> on secure mode?
>>
>> Thanks,
>> Raja.
>>
>>
>>
>>
>> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh 
>> wrote:
>>
>> > Thanks Raja, it was helpful
>> >
>> > Now I am able to start zookeeper and broker in secure mode ready for SSL
>> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on
>> producer.
>> >
>> > I using the default configuration and keystore. Is there anything
>> missing
>> >
>> > *Start broker:*
>> >
>> > *bin/kafka-server-start.sh config/server.properties*
>> >
>> >
>> >
>> > *broker.log:*
>> >
>> > [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected)
>> > (org.I0Itec.zkclient.ZkClient)
>> >
>> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
>> > (kafka.log.LogManager)
>> >
>> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log
>> > secure.test-0. (kafka.log.Log)
>> >
>> > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with
>> log
>> > end offset 0 (kafka.log.Log)
>> >
>> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of
>> 6
>> > ms. (kafka.log.LogManager)
>> >
>> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default
>> period
>> > of 9223372036854775807 ms. (kafka.log.LogManager)
>> >
>> > [2014-07-17 15:34:46,614] INFO Initializing secure authentication
>> > (kafka.network.security.SecureAuth$)
>> >
>> > [2014-07-17 15:34:46,678] INFO Secure authentication initialization has
>> > been successfully completed (kafka.network.security.SecureAuth$)
>> >
>> > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on
>> 0.0.0.0:9092
>> > .
>> > (kafka.network.Acceptor)
>> >
>> > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started
>> > (kafka.network.SocketServer)
>> >
>> > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is
>> not in
>> > the classpath (kafka.utils.Mx4jLoader$)
>> >
>> > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader
>> > (kafka.server.ZookeeperLeaderElector)
>> >
>> > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path
>> /brokers/ids/0
>> > with address 10.1.100.130:9092. (kafka.utils.ZkUtils$)
>> >
>> > [2014-07-17 15:34:47,059] INFO New leader is 0
>> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
>> >
>> > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started
>> > (kafka.server.KafkaServer)*
>> >
>> > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for
>> > /10.1.100.130:9092//10.1.100.130:51685
>> > <http://10.1.100.130:9092//10.1.100.130:51685>
>> > (kafka.network.security.SSLSocketChannel)*
>> >
>> > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for
>> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
>> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
>> > (kafka.network.security.SSLSocketChannel)*
>> >
>> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
>> > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
>> > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
>> > (kafka.network.security.SSLSocketChannel)*
>> >
>> > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
>> > /10.1.100.130:9092//10.1.100.130:51685
>> > <http://10.1.100.130:9092//10.1.100

Re: [DISCUSS] Kafka Security Specific Features

2014-07-18 Thread Pramod Deshmukh
or topics
secureTopic with correlation ids in [0,8]
(kafka.producer.async.DefaultEventHandler)

[2014-07-18 13:12:46,283] ERROR Error in handling batch of 1 events
(kafka.producer.async.ProducerSendThread)

kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.

at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)

at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:526)

at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)



On Fri, Jul 18, 2014 at 11:56 AM, Joe Stein  wrote:

> Hi Pramod,
>
> Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the
> kafka-console-producer.sh to see if that gets you further along please in
> your testing?
>
> Thanks!
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********/
>
>
> On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh 
> wrote:
>
> > Hello Raja/Joe,
> > When I turn on security, i still get out of memory error on producer. Is
> > this something to do with keys? Is there any other way I can connect to
> > broker?
> >
> > *producer log*
> > [2014-07-17 15:38:14,186] ERROR OOME with size 352518400 (kafka.network.
> > BoundedByteBufferReceive)
> > java.lang.OutOfMemoryError: Java heap space
> >
> > *broker log*
> >
> > INFO begin ssl handshake for localhost/127.0.0.1:50199//127.0.0.1:9092
> >
> >
> >
> >
> >
> > On Thu, Jul 17, 2014 at 6:07 PM, Pramod Deshmukh 
> > wrote:
> >
> > > Correct, I don't see any exceptions when i turn off security. Consumer
> is
> > > able to consume the message.
> > >
> > > I still see warning for topic property.
> > >
> > > [2014-07-17 18:04:38,360] WARN Property topic is not valid
> > > (kafka.utils.VerifiableProperties)
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango <
> > rela...@salesforce.com>
> > > wrote:
> > >
> > >> Can you try with turning off security to check if this error happens
> > only
> > >> on secure mode?
> > >>
> > >> Thanks,
> > >> Raja.
> > >>
> > >>
> > >>
> > >>
> > >> On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh 
> > >> wrote:
> > >>
> > >> > Thanks Raja, it was helpful
> > >> >
> > >> > Now I am able to start zookeeper and broker in secure mode ready for
> > SSL
> > >> > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on
> > >> producer.
> > >> >
> > >> > I using the default configuration and keystore. Is there anything
> > >> missing
> > >> >
> > >> > *Start broker:*
> > >> >
> > >> > *bin/kafka-server-start.sh config/server.properties*
> > >> >
> > >> >
> > >> >
> > >> > *broker.log:*
> > >> >
> > >> > [2014-07-17 15:34:46,281] INFO zookeeper state changed
> (SyncConnected)
> > >> > (org.I0Itec.zkclient.ZkClient)
> > >> >
> > >> > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
> > >> > (kafka.log.LogManager)
> > >> >
> > >> > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log
> > >> > secure.test-0. (kafka.log.Log)
> > >> >
> > >> > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0
> > with
> > >> log
> > >> > end offset 0 (kafka.log.Log)
> > >> >
> > >> > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of
> > >> 6
> > >> > ms. (kafka.log.LogManager)
> > >> >
> > >> > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default
> > >> period
> > >> > of 92233720

Re: [DISCUSS] Kafka Security Specific Features

2014-07-22 Thread Pramod Deshmukh
Anyone getting this issue. Is it something related to environment or it is
the code. Producer works fine when run with secure=false (no security) mode.


pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
--topic secureTopic

[2014-07-18 13:12:29,817] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)

Hare Krishna

[2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
0 for topics [Set(secureTopic)] from broker
[id:0,host:localhost,port:9092,secure:true] failed
(kafka.client.ClientUtils$)

java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

at kafka.utils.Utils$.read(Utils.scala:381)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)

at kafka.network.Receive$class.readCompletely(Transmission.scala:56)

at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)

at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)

at kafka.producer.SyncProducer.send(SyncProducer.scala:117)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)

at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)

at kafka.utils.Utils$.swallow(Utils.scala:172)

at kafka.utils.Logging$class.swallowError(Logging.scala:106)

at kafka.utils.Utils$.swallowError(Utils.scala:45)

at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)

at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)

at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:526)

at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)

at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)


On Fri, Jul 18, 2014 at 1:20 PM, Pramod Deshmukh  wrote:

> Thanks Joe, I don't see any Out of memory error. Now I get exception when
> Producer fetches metadata for a topic
>
> Here is how I created the topic and run producer
>
> pdeshmukh$ bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic secureTopic
> Created topic "secureTopic".
>
> pdeshmukh$ bin/kafka-topics.sh --list --zookeeper localhost:2181
>
> secure.test
>
> secureTopic
>
> >> Run producer, tried both localhost:9092:true and localhost:9092
>
> pdeshmukh$ bin/kafka-console-producer.sh --broker-list localhost:9092:true
> --topic secureTopic
>
> [2014-07-18 13:12:29,817] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hare Krishna
>
> [2014-07-18 13:12:45,256] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(secureTopic)] from broker
> [id:0,host:localhost,port:9092,secure:true] failed
> (kafka.client.ClientUtils$)
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
> kafka.produce