Correct, I don't see any exceptions when i turn off security. Consumer is able to consume the message.
I still see warning for topic property. [2014-07-17 18:04:38,360] WARN Property topic is not valid (kafka.utils.VerifiableProperties) On Thu, Jul 17, 2014 at 5:49 PM, Rajasekar Elango <rela...@salesforce.com> wrote: > Can you try with turning off security to check if this error happens only > on secure mode? > > Thanks, > Raja. > > > > > On Thu, Jul 17, 2014 at 3:51 PM, Pramod Deshmukh <dpram...@gmail.com> > wrote: > > > Thanks Raja, it was helpful > > > > Now I am able to start zookeeper and broker in secure mode ready for SSL > > handshake. I get *java.lang.OutOfMemoryError: Java heap space* on > producer. > > > > I using the default configuration and keystore. Is there anything missing > > > > *Start broker:* > > > > *bin/kafka-server-start.sh config/server.properties* > > > > > > > > *broker.log:* > > > > [2014-07-17 15:34:46,281] INFO zookeeper state changed (SyncConnected) > > (org.I0Itec.zkclient.ZkClient) > > > > [2014-07-17 15:34:46,523] INFO Loading log 'secure.test-0' > > (kafka.log.LogManager) > > > > [2014-07-17 15:34:46,558] INFO Recovering unflushed segment 0 in log > > secure.test-0. (kafka.log.Log) > > > > [2014-07-17 15:34:46,571] INFO Completed load of log secure.test-0 with > log > > end offset 0 (kafka.log.Log) > > > > [2014-07-17 15:34:46,582] INFO Starting log cleanup with a period of > 60000 > > ms. (kafka.log.LogManager) > > > > [2014-07-17 15:34:46,587] INFO Starting log flusher with a default period > > of 9223372036854775807 ms. (kafka.log.LogManager) > > > > [2014-07-17 15:34:46,614] INFO Initializing secure authentication > > (kafka.network.security.SecureAuth$) > > > > [2014-07-17 15:34:46,678] INFO Secure authentication initialization has > > been successfully completed (kafka.network.security.SecureAuth$) > > > > [2014-07-17 15:34:46,691] INFO Awaiting socket connections on > 0.0.0.0:9092 > > . > > (kafka.network.Acceptor) > > > > [2014-07-17 15:34:46,692] INFO [Socket Server on Broker 0], Started > > (kafka.network.SocketServer) > > > > [2014-07-17 15:34:46,794] INFO Will not load MX4J, mx4j-tools.jar is not > in > > the classpath (kafka.utils.Mx4jLoader$) > > > > [2014-07-17 15:34:46,837] INFO 0 successfully elected as leader > > (kafka.server.ZookeeperLeaderElector) > > > > [2014-07-17 15:34:47,057] INFO Registered broker 0 at path /brokers/ids/0 > > with address 10.1.100.130:9092. (kafka.utils.ZkUtils$) > > > > [2014-07-17 15:34:47,059] INFO New leader is 0 > > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > > > > *[2014-07-17 15:34:47,068] INFO [Kafka Server 0], started > > (kafka.server.KafkaServer)* > > > > *[2014-07-17 15:34:47,383] INFO begin ssl handshake for > > /10.1.100.130:9092//10.1.100.130:51685 > > <http://10.1.100.130:9092//10.1.100.130:51685> > > (kafka.network.security.SSLSocketChannel)* > > > > *[2014-07-17 15:34:47,392] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 > > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092> > > (kafka.network.security.SSLSocketChannel)* > > > > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for > > 10.1.100.130/10.1.100.130:51685//10.1.100.130:9092 > > <http://10.1.100.130/10.1.100.130:51685//10.1.100.130:9092> > > (kafka.network.security.SSLSocketChannel)* > > > > *[2014-07-17 15:34:47,465] INFO finished ssl handshake for > > /10.1.100.130:9092//10.1.100.130:51685 > > <http://10.1.100.130:9092//10.1.100.130:51685> > > (kafka.network.security.SSLSocketChannel)* > > > > *[2014-07-17 15:34:47,617] INFO [ReplicaFetcherManager on broker 0] > Removed > > fetcher for partitions (kafka.server.ReplicaFetcherManager)* > > > > *[2014-07-17 15:34:47,627] INFO [ReplicaFetcherManager on broker 0] Added > > fetcher for partitions List() (kafka.server.ReplicaFetcherManager)* > > > > *[2014-07-17 15:34:47,656] INFO [ReplicaFetcherManager on broker 0] > Removed > > fetcher for partitions [secure.test,0] > > (kafka.server.ReplicaFetcherManager)* > > > > [2014-07-17 15:37:15,970] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51689//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:16,075] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51690//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:16,434] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51691//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:16,530] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51692//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:16,743] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51693//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:16,834] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51694//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:17,043] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51695//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:17,137] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51696//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > [2014-07-17 15:37:17,342] INFO begin ssl handshake for > > 10.1.100.130/10.1.100.130:51697//10.1.100.130:9092 > > (kafka.network.security.SSLSocketChannel) > > > > > > *Start producer* > > > > *bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true > > --topic > > secure.test* > > > > > > *producer.log:* > > > > bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true > --topic > > secure.test > > > > [2014-07-17 15:37:46,889] WARN Property topic is not valid > > (kafka.utils.VerifiableProperties) > > > > Hello Secure Kafka > > > > *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400 > > (kafka.network.BoundedByteBufferReceive)* > > > > *java.lang.OutOfMemoryError: Java heap space* > > > > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > > > > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > > > > at > > > > > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > > > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102) > > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79) > > > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:117) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > > > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > > > at kafka.utils.Utils$.swallow(Utils.scala:172) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > > > at > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > > > at scala.collection.immutable.Stream.foreach(Stream.scala:526) > > > > at > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > > > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > > > > > > > On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango < > rela...@salesforce.com> > > wrote: > > > > > Pramod, > > > > > > > > > I presented secure kafka configuration and usage at last meet up. So > hope > > > this > > > video recording <http://www.ustream.tv/recorded/48396701>would help. > You > > > can skip to about 59 min to jump to security talk. > > > > > > Thanks, > > > Raja. > > > > > > > > > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh <dpram...@gmail.com> > > > wrote: > > > > > > > Hello Joe, > > > > > > > > Is there a configuration or example to test Kafka security piece? > > > > > > > > Thanks, > > > > > > > > Pramod > > > > > > > > > > > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh <dpram...@gmail.com > > > > > > wrote: > > > > > > > > > Thanks Joe, > > > > > > > > > > This branch works. I was able to proceed. I still had to set scala > > > > version > > > > > to 2.9.2 in kafka-run-class.sh. > > > > > > > > > > > > > > > > > > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein <joe.st...@stealth.ly> > > > wrote: > > > > > > > > > >> That is a very old branch. > > > > >> > > > > >> Here is a more up to date one > > > > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs > to > > > be > > > > >> updated to latest trunk might have a chance to-do that next week). > > > > >> > > > > >> You should be using gradle now as per the README. > > > > >> > > > > >> /******************************************* > > > > >> Joe Stein > > > > >> Founder, Principal Consultant > > > > >> Big Data Open Source Security LLC > > > > >> http://www.stealth.ly > > > > >> Twitter: @allthingshadoop < > http://www.twitter.com/allthingshadoop> > > > > >> ********************************************/ > > > > >> > > > > >> > > > > >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh < > > dpram...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Thanks Joe for this, > > > > >> > > > > > >> > I cloned this branch and tried to run zookeeper but I get > > > > >> > > > > > >> > Error: Could not find or load main class > > > > >> > org.apache.zookeeper.server.quorum.QuorumPeerMain > > > > >> > > > > > >> > > > > > >> > I see scala version is still set to 2.8.0 > > > > >> > > > > > >> > if [ -z "$SCALA_VERSION" ]; then > > > > >> > > > > > >> > SCALA_VERSION=2.8.0 > > > > >> > > > > > >> > fi > > > > >> > > > > > >> > > > > > >> > > > > > >> > Then I installed sbt and scala and followed your instructions > for > > > > >> different > > > > >> > scala versions. I was able to bring zookeeper up but brokers > fail > > to > > > > >> start > > > > >> > with error > > > > >> > > > > > >> > Error: Could not find or load main class kafka.Kafka > > > > >> > > > > > >> > I think I am doing something wrong. Can you please help me? > > > > >> > > > > > >> > Our current production setup is with 2.8.0 and want to stick to > > it. > > > > >> > > > > > >> > Thanks, > > > > >> > > > > > >> > Pramod > > > > >> > > > > > >> > > > > > >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein <joe.st...@stealth.ly > > > > > > wrote: > > > > >> > > > > > >> > > Hi,I wanted to re-ignite the discussion around Apache Kafka > > > > Security. > > > > >> > This > > > > >> > > is a huge bottleneck (non-starter in some cases) for a lot of > > > > >> > organizations > > > > >> > > (due to regulatory, compliance and other requirements). Below > > are > > > my > > > > >> > > suggestions for specific changes in Kafka to accommodate > > security > > > > >> > > requirements. This comes from what folks are doing "in the > > wild" > > > to > > > > >> > > workaround and implement security with Kafka as it is today > and > > > also > > > > >> > what I > > > > >> > > have discovered from organizations about their blockers. It > also > > > > >> picks up > > > > >> > > from the wiki (which I should have time to update later in the > > > week > > > > >> based > > > > >> > > on the below and feedback from the thread). > > > > >> > > > > > > >> > > 1) Transport Layer Security (i.e. SSL) > > > > >> > > > > > > >> > > This also includes client authentication in addition to > > in-transit > > > > >> > security > > > > >> > > layer. This work has been picked up here > > > > >> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do > > > appreciate > > > > >> any > > > > >> > > thoughts, comments, feedback, tomatoes, whatever for this > patch. > > > It > > > > >> is a > > > > >> > > pickup from the fork of the work first done here > > > > >> > > https://github.com/relango/kafka/tree/kafka_security. > > > > >> > > > > > > >> > > 2) Data encryption at rest. > > > > >> > > > > > > >> > > This is very important and something that can be facilitated > > > within > > > > >> the > > > > >> > > wire protocol. It requires an additional map data structure > for > > > the > > > > >> > > "encrypted [data encryption key]". With this map (either in > your > > > > >> object > > > > >> > or > > > > >> > > in the wire protocol) you can store the dynamically generated > > > > >> symmetric > > > > >> > key > > > > >> > > (for each message) and then encrypt the data using that > > > dynamically > > > > >> > > generated key. You then encrypt the encryption key using each > > > > public > > > > >> key > > > > >> > > for whom is expected to be able to decrypt the encryption key > to > > > > then > > > > >> > > decrypt the message. For each public key encrypted symmetric > > key > > > > >> (which > > > > >> > is > > > > >> > > now the "encrypted [data encryption key]" along with which > > public > > > > key > > > > >> it > > > > >> > > was encrypted with for (so a map of [publicKey] = > > > > >> > > encryptedDataEncryptionKey) as a chain. Other patterns can > be > > > > >> > implemented > > > > >> > > but this is a pretty standard digital enveloping [0] pattern > > with > > > > >> only 1 > > > > >> > > field added. Other patterns should be able to use that field > > to-do > > > > >> their > > > > >> > > implementation too. > > > > >> > > > > > > >> > > 3) Non-repudiation and long term non-repudiation. > > > > >> > > > > > > >> > > Non-repudiation is proving data hasn't changed. This is often > > (if > > > > not > > > > >> > > always) done with x509 public certificates (chained to a > > > certificate > > > > >> > > authority). > > > > >> > > > > > > >> > > Long term non-repudiation is what happens when the > certificates > > of > > > > the > > > > >> > > certificate authority are expired (or revoked) and everything > > ever > > > > >> signed > > > > >> > > (ever) with that certificate's public key then becomes "no > > longer > > > > >> > provable > > > > >> > > as ever being authentic". That is where RFC3126 [1] and > RFC3161 > > > [2] > > > > >> come > > > > >> > > in (or worm drives [hardware], etc). > > > > >> > > > > > > >> > > For either (or both) of these it is an operation of the > > encryptor > > > to > > > > >> > > sign/hash the data (with or without third party trusted > timestap > > > of > > > > >> the > > > > >> > > signing event) and encrypt that with their own private key and > > > > >> distribute > > > > >> > > the results (before and after encrypting if required) along > with > > > > their > > > > >> > > public key. This structure is a bit more complex but feasible, > > it > > > > is a > > > > >> > map > > > > >> > > of digital signature formats and the chain of dig sig > > > attestations. > > > > >> The > > > > >> > > map's key being the method (i.e. CRC32, PKCS7 [3], XmlDigSig > > [4]) > > > > and > > > > >> > then > > > > >> > > a list of map where that key is "purpose" of signature (what > > your > > > > >> > attesting > > > > >> > > too). As a sibling field to the list another field for "the > > > > >> attester" as > > > > >> > > bytes (e.g. their PKCS12 [5] for the map of PKCS7 signatures). > > > > >> > > > > > > >> > > 4) Authorization > > > > >> > > > > > > >> > > We should have a policy of "404" for data, topics, partitions > > > (etc) > > > > if > > > > >> > > authenticated connections do not have access. In "secure > mode" > > > any > > > > >> non > > > > >> > > authenticated connections should get a "404" type message on > > > > >> everything. > > > > >> > > Knowing "something is there" is a security risk in many uses > > > cases. > > > > >> So > > > > >> > if > > > > >> > > you don't have access you don't even see it. Baking "that" > into > > > > Kafka > > > > >> > > along with some interface for entitlement (access management) > > > > systems > > > > >> > > (pretty standard) is all that I think needs to be done to the > > core > > > > >> > project. > > > > >> > > I want to tackle item later in the year after summer after > the > > > > other > > > > >> > three > > > > >> > > are complete. > > > > >> > > > > > > >> > > I look forward to thoughts on this and anyone else interested > in > > > > >> working > > > > >> > > with us on these items. > > > > >> > > > > > > >> > > [0] > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/what-is-a-digital-envelope.htm > > > > >> > > [1] http://tools.ietf.org/html/rfc3126 > > > > >> > > [2] http://tools.ietf.org/html/rfc3161 > > > > >> > > [3] > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > http://www.emc.com/emc-plus/rsa-labs/standards-initiatives/pkcs-7-cryptographic-message-syntax-standar.htm > > > > >> > > [4] http://en.wikipedia.org/wiki/XML_Signature > > > > >> > > [5] http://en.wikipedia.org/wiki/PKCS_12 > > > > >> > > > > > > >> > > /******************************************* > > > > >> > > Joe Stein > > > > >> > > Founder, Principal Consultant > > > > >> > > Big Data Open Source Security LLC > > > > >> > > http://www.stealth.ly > > > > >> > > Twitter: @allthingshadoop < > > > http://www.twitter.com/allthingshadoop> > > > > >> > > ********************************************/ > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Raja. > > > > > > > > > -- > Thanks, > Raja. >