Can you try with turning off security to check if this error happens only
on secure mode?

Thanks,
Raja.




On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh <dpram...@gmail.com> wrote:

> Thanks Raja, it was helpful
>
> Now I am able to start zookeeper and broker in secure mode ready for SSL
> handshake. I get *java.lang.OutOfMemoryError: Java heap space* on producer.
>
> I using the default configuration and keystore. Is there anything missing
>
> *Start broker:*
>
> *bin/kafka-server-start.sh config/server.properties*
>
>
>
> *broker.log:*
>
> [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
>
> [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0'
> (kafka.log.LogManager)
>
> [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log
> secure.test-0. (kafka.log.Log)
>
> [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with log
> end offset 0 (kafka.log.Log)
>
> [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of 60000
> ms. (kafka.log.LogManager)
>
> [2014-07-17 15:34:46,587] INFO Starting log flusher with a default period
> of 9223372036854775807 ms. (kafka.log.LogManager)
>
> [2014-07-17 15:34:46,614] INFO Initializing secure authentication
> (kafka.network.security.SecureAuth$)
>
> [2014-07-17 15:34:46,678] INFO Secure authentication initialization has
> been successfully completed (kafka.network.security.SecureAuth$)
>
> [2014-07-17 15:34:46,691] INFO Awaiting socket connections on 0.0.0.0:9092
> .
> (kafka.network.Acceptor)
>
> [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started
> (kafka.network.SocketServer)
>
> [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is not in
> the classpath (kafka.utils.Mx4jLoader$)
>
> [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
>
> [2014-07-17 15:34:47,057] INFO Registered broker 0 at path /brokers/ids/0
> with address 10.1.100.130:9092. (kafka.utils.ZkUtils$)
>
> [2014-07-17 15:34:47,059] INFO New leader is 0
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
>
> *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started
> (kafka.server.KafkaServer)*
>
> *[2014-07-17 15:34:47,383] INFO begin ssl handshake for
> /10.1.100.130:9092//10.1.100.130:51685
> <http://10.1.100.130:9092//10.1.100.130:51685>
> (kafka.network.security.SSLSocketChannel)*
>
> *[2014-07-17 15:34:47,392] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
> <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
> (kafka.network.security.SSLSocketChannel)*
>
> *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092
> <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092>
> (kafka.network.security.SSLSocketChannel)*
>
> *[2014-07-17 15:34:47,465] INFO finished ssl handshake for
> /10.1.100.130:9092//10.1.100.130:51685
> <http://10.1.100.130:9092//10.1.100.130:51685>
> (kafka.network.security.SSLSocketChannel)*
>
> *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0] Removed
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)*
>
> *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] Added
> fetcher for partitions List() (kafka.server.ReplicaFetcherManager)*
>
> *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0] Removed
> fetcher for partitions [secure.test,0]
> (kafka.server.ReplicaFetcherManager)*
>
> [2014-07-17 15:37:15,970] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:16,075] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51690//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:16,434] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51691//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:16,530] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51692//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:16,743] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51693//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:16,834] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51694//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:17,043] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51695//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:17,137] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51696//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:17,342] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51697//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
>
> *Start producer*
>
> *bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true
> --topic
> secure.test*
>
>
> *producer.log:*
>
> bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true --topic
> secure.test
>
> [2014-07-17 15:37:46,889] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hello Secure Kafka
>
> *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400
> (kafka.network.BoundedByteBufferReceive)*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
>
>
> On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango <rela...@salesforce.com>
> wrote:
>
> > Pramod,
> >
> >
> > I presented secure kafka configuration and usage at last meet up. So hope
> > this
> > video recording <http://www.ustream.tv/recorded/48396701>would help. You
> > can skip to about 59 min to jump to security talk.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh <dpram...@gmail.com>
> > wrote:
> >
> > > Hello Joe,
> > >
> > > Is there a configuration or example to test Kafka security piece?
> > >
> > > Thanks,
> > >
> > > Pramod
> > >
> > >
> > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh <dpram...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Joe,
> > > >
> > > > This branch works. I was able to proceed. I still had to set scala
> > > version
> > > > to 2.9.2 in kafka-run-class.sh.
> > > >
> > > >
> > > >
> > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein <joe.st...@stealth.ly>
> > wrote:
> > > >
> > > >> That is a very old branch.
> > > >>
> > > >> Here is a more up to date one
> > > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to
> > be
> > > >> updated to latest trunk might have a chance to-do that next week).
> > > >>
> > > >> You should be using gradle now as per the README.
> > > >>
> > > >> /*******************************************
> > > >>  Joe Stein
> > > >>  Founder, Principal Consultant
> > > >>  Big Data Open Source Security LLC
> > > >>  http://www.stealth.ly
> > > >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > >> ********************************************/
> > > >>
> > > >>
> > > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh <
> dpram...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Thanks Joe for this,
> > > >> >
> > > >> > I cloned this branch and tried to run zookeeper but I get
> > > >> >
> > > >> > Error: Could not find or load main class
> > > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain
> > > >> >
> > > >> >
> > > >> > I see scala version is still set to 2.8.0
> > > >> >
> > > >> > if [ -z "$SCALA_VERSION" ]; then
> > > >> >
> > > >> >         SCALA_VERSION=2.8.0
> > > >> >
> > > >> > fi
> > > >> >
> > > >> >
> > > >> >
> > > >> > Then I installed sbt and scala and followed your instructions for
> > > >> different
> > > >> > scala versions. I was able to bring zookeeper up but brokers fail
> to
> > > >> start
> > > >> > with error
> > > >> >
> > > >> > Error: Could not find or load main class kafka.Kafka
> > > >> >
> > > >> > I think I am doing something wrong. Can you please help me?
> > > >> >
> > > >> > Our current production setup is with 2.8.0 and want to stick to
> it.
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Pramod
> > > >> >
> > > >> >
> > > >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein <joe.st...@stealth.ly>
> > > wrote:
> > > >> >
> > > >> > > Hi,I wanted to re-ignite the discussion around Apache Kafka
> > > Security.
> > > >> >  This
> > > >> > > is a huge bottleneck (non-starter in some cases) for a lot of
> > > >> > organizations
> > > >> > > (due to regulatory, compliance and other requirements). Below
> are
> > my
> > > >> > > suggestions for specific changes in Kafka to accommodate
> security
> > > >> > > requirements.  This comes from what folks are doing "in the
> wild"
> > to
> > > >> > > workaround and implement security with Kafka as it is today and
> > also
> > > >> > what I
> > > >> > > have discovered from organizations about their blockers. It also
> > > >> picks up
> > > >> > > from the wiki (which I should have time to update later in the
> > week
> > > >> based
> > > >> > > on the below and feedback from the thread).
> > > >> > >
> > > >> > > 1) Transport Layer Security (i.e. SSL)
> > > >> > >
> > > >> > > This also includes client authentication in addition to
> in-transit
> > > >> > security
> > > >> > > layer.  This work has been picked up here
> > > >> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do
> > appreciate
> > > >> any
> > > >> > > thoughts, comments, feedback, tomatoes, whatever for this patch.
> >  It
> > > >> is a
> > > >> > > pickup from the fork of the work first done here
> > > >> > > https://github.com/relango/kafka/tree/kafka_security.
> > > >> > >
> > > >> > > 2) Data encryption at rest.
> > > >> > >
> > > >> > > This is very important and something that can be facilitated
> > within
> > > >> the
> > > >> > > wire protocol. It requires an additional map data structure for
> > the
> > > >> > > "encrypted [data encryption key]". With this map (either in your
> > > >> object
> > > >> > or
> > > >> > > in the wire protocol) you can store the dynamically generated
> > > >> symmetric
> > > >> > key
> > > >> > > (for each message) and then encrypt the data using that
> > dynamically
> > > >> > > generated key.  You then encrypt the encryption key using each
> > > public
> > > >> key
> > > >> > > for whom is expected to be able to decrypt the encryption key to
> > > then
> > > >> > > decrypt the message.  For each public key encrypted symmetric
> key
> > > >> (which
> > > >> > is
> > > >> > > now the "encrypted [data encryption key]" along with which
> public
> > > key
> > > >> it
> > > >> > > was encrypted with for (so a map of [publicKey] =
> > > >> > > encryptedDataEncryptionKey) as a chain.   Other patterns can be
> > > >> > implemented
> > > >> > > but this is a pretty standard digital enveloping [0] pattern
> with
> > > >> only 1
> > > >> > > field added. Other patterns should be able to use that field
> to-do
> > > >> their
> > > >> > > implementation too.
> > > >> > >
> > > >> > > 3) Non-repudiation and long term non-repudiation.
> > > >> > >
> > > >> > > Non-repudiation is proving data hasn't changed.  This is often
> (if
> > > not
> > > >> > > always) done with x509 public certificates (chained to a
> > certificate
> > > >> > > authority).
> > > >> > >
> > > >> > > Long term non-repudiation is what happens when the certificates
> of
> > > the
> > > >> > > certificate authority are expired (or revoked) and everything
> ever
> > > >> signed
> > > >> > > (ever) with that certificate's public key then becomes "no
> longer
> > > >> > provable
> > > >> > > as ever being authentic".  That is where RFC3126 [1] and RFC3161
> > [2]
> > > >> come
> > > >> > > in (or worm drives [hardware], etc).
> > > >> > >
> > > >> > > For either (or both) of these it is an operation of the
> encryptor
> > to
> > > >> > > sign/hash the data (with or without third party trusted timestap
> > of
> > > >> the
> > > >> > > signing event) and encrypt that with their own private key and
> > > >> distribute
> > > >> > > the results (before and after encrypting if required) along with
> > > their
> > > >> > > public key. This structure is a bit more complex but feasible,
> it
> > > is a
> > > >> > map
> > > >> > > of digital signature formats and the chain of dig sig
> > attestations.
> > > >>  The
> > > >> > > map's key being the method (i.e. CRC32, PKCS7 [3], XmlDigSig
> [4])
> > > and
> > > >> > then
> > > >> > > a list of map where that key is "purpose" of signature (what
> your
> > > >> > attesting
> > > >> > > too).  As a sibling field to the list another field for "the
> > > >> attester" as
> > > >> > > bytes (e.g. their PKCS12 [5] for the map of PKCS7 signatures).
> > > >> > >
> > > >> > > 4) Authorization
> > > >> > >
> > > >> > > We should have a policy of "404" for data, topics, partitions
> > (etc)
> > > if
> > > >> > > authenticated connections do not have access.  In "secure mode"
> > any
> > > >> non
> > > >> > > authenticated connections should get a "404" type message on
> > > >> everything.
> > > >> > > Knowing "something is there" is a security risk in many uses
> > cases.
> > > >>  So
> > > >> > if
> > > >> > > you don't have access you don't even see it.  Baking "that" into
> > > Kafka
> > > >> > > along with some interface for entitlement (access management)
> > > systems
> > > >> > > (pretty standard) is all that I think needs to be done to the
> core
> > > >> > project.
> > > >> > >  I want to tackle item later in the year after summer after the
> > > other
> > > >> > three
> > > >> > > are complete.
> > > >> > >
> > > >> > > I look forward to thoughts on this and anyone else interested in
> > > >> working
> > > >> > > with us on these items.
> > > >> > >
> > > >> > > [0]
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/what-is-a-digital-envelope.htm
> > > >> > > [1] http://tools.ietf.org/html/rfc3126
> > > >> > > [2] http://tools.ietf.org/html/rfc3161
> > > >> > > [3]
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/pkcs-7-cryptographic-message-syntax-standar.htm
> > > >> > > [4] http://en.wikipedia.org/wiki/XML_Signature
> > > >> > > [5] http://en.wikipedia.org/wiki/PKCS_12
> > > >> > >
> > > >> > > /*******************************************
> > > >> > >  Joe Stein
> > > >> > >  Founder, Principal Consultant
> > > >> > >  Big Data Open Source Security LLC
> > > >> > >  http://www.stealth.ly
> > > >> > >  Twitter: @allthingshadoop <
> > http://www.twitter.com/allthingshadoop>
> > > >> > > ********************************************/
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.

Reply via email to