Re: Create KTable from two topics

2016-06-02 Thread Guozhang Wang
Hello Srikanth,

When involved in joins, KTable need to pass both the old value as well as
the new value as a pair to the join operator since it is "an ever updating
table with the underlying changelog", for example, your topic1 stream have
key "128073" with updated values from 542361 to 560608. The pair
has will be printed as "(new <- old)" when printing directly; but if you
call "table.toStream.print", the library will then ignore the old value in
"toStream" but only pass the new value to the "print" operator.

Now as for the duplicate: they are actually from the pair , i.e.
if you remove "toStream" for your metadataKTable, I think you will see sth.
like:

128073 , null <- null

128073 , (542361,100710) <- (null, 100710)
...
128073 , (560608,100710) <- (542361, 100710)


Guozhang

On Thu, Jun 2, 2016 at 9:58 AM, Srikanth  wrote:

> I did try approach 3 yesterday with the following sample data.
>
> topic1:
> 127339  538433
> 131933  626026
> 128072  536012
> 128074  546262
> *123507  517631*
> 128073  542361
> 128073  560608
>
> topic2:
> 128074  100282
> 131933  100394
> 127339  100445
> 128073  100710
> *123507  100226*
>
> I joined these and printed the result
> val KTable1 = kStreamBuilder.table(intSerde, intSerde, "topic1")
> val KTable2 = kStreamBuilder.table(intSerde, intSerde, "topic2")
> val metadataKTable = KTable1.join(KTable2, (user, loc) => (user, loc) )
> metadataKTable.toStream().print()
>
> In the output I see each key being output twice. Didn't understand why?
>
> Started Streams Example.
> *123507 , (517631,100226)*
> 127339 , (538433,100445)
> 128073 , (542361,100710)
> 131933 , (626026,100394)
> 128073 , (560608,100710)
> 128072 , null
> 128074 , (546262,100282)
> 128074 , (546262,100282)
> 128073 , (560608,100710)
> *123507 , (517631,100226)*
> 131933 , (626026,100394)
> 127339 , (538433,100445)
> Finished Streams Example.
>
> If I store the joinedTable to an intermediate topic and read it back, I see
> duplicate records too.
> val metadataKTable = kStreamBuilder.table(intSerde, intSerde,
> metadataTopicName)
> metadataKTable.print()
>
> Started Streams Example.
> 538433 , (100445<-null)
> 546262 , (100282<-null)
> 546262 , (100282<-null)
> 538433 , (100445<-null)
> *517631 , (100226<-null)*
> *517631 , (100226<-null)*
> 542361 , (100710<-null)
> 560608 , (100710<-null)
> 560608 , (100710<-null)
> 626026 , (100394<-null)
> 626026 , (100394<-null)
> Finished Streams Example.
>
> BTW, what is the strange "*<-null"* in KTable.print mean?
>
> Srikanth
>
> On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax 
> wrote:
>
> > Hi Srikanth,
> >
> > your third approach seems to be the best fit. It uses only one shuffle
> > of the data (which you cannot prevent in any case).
> >
> > If you want to put everything into a single application, you could use a
> > "dummy" custom aggregation to convert the KStream into a KTable instead
> > of writing into a topic and reading it from a second application.
> >
> > val kTable = metadataKTable
> >  .toStream()
> >  .map((k,v) => new KeyValue(v._1, v._2))
> >  .through("intermediate topic")
> >  .aggregateByKey(...);
> >
> > The aggregate function just replaces the old value with the new value
> > (ie, not really performing an aggregation).
> >
> > -Matthias
> >
> >
> > On 06/01/2016 08:03 PM, Srikanth wrote:
> > > Hello,
> > >
> > > How do I build a KTable from two topics such that key is in one topic
> and
> > > value in other?
> > >
> > > Ex,
> > > topic1 has a key called basekey and userId as value.
> > > topic2 has same basekey and locationId as value
> > >
> > > topic1 = {"basekey":1,"userId":111}
> > > topic1 = {"basekey":2,"userId":222}
> > >
> > > topic2 = {"basekey":1,"locId":888}
> > > topic2 = {"basekey":2,"locId":999}
> > >
> > > I want to build a KTable with userId as key and locationId as value.
> > > This KTable will be used to enrich a KStream that only has userId and
> > needs
> > > to be updated with locationId.
> > >
> > > val KTable1: KTable[Integer, Integer] =
> > kStreamBuilder.table(intSerde,
> > > intSerde, "topic1")  --> basekey is used as key
> > > val KTable2: KTable[Integer, Integer] =
> > kStreamBuilder.table(intSerde,
> > > intSerde, "topic2")  --> basekey is used as key
> > >
> > > val metadataKTable: KTable[Integer, Integer] =
> > >   KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc)
> )
> > >//.map((k,v) => (v._1, v._2) --> .map() is not supported
> > on
> > > KTable
> > >
> > > Problem is KTable doesn't have an API to update its key. It only has a
> > > mapValue().
> > > I guess since the key is used in underlying rocksDB, it isn't easy to
> > > change the key.
> > > I was exploring if I can pass it through() another topic using
> > > custom StreamPartitioner.
> > > That will let me partition using a field in value but still can't
> replace
> > > the key.
> > >
> > >
> > > Alternate one, is to join the KStream 

Re: Kafka encryption

2016-06-02 Thread Jim Hoagland
I'm hesitant to cite it because it wasn't really a proper benchmark, but
with the end-to-end encryption through Kafka proof of concept described at
http://symc.ly/1pC2CEG, doing the encryption added only 26% to the time
taken to send messages and only 6% to the time taken to consume messages.
This is with batching 200 300-byte messages together for encryption.  More
details are in the blog post.

Personally I think that encrypting sensitive data before handing it to
Kafka (or at least before it leaves the producing box) just makes sense to
do if the situation allows.  The Kafka installation wouldn't be able
reveal the data even if its systems and networks are compromised because
it never sees the data in the clear and doesn't know how to decrypt it.
In the way we set it up, someone would need the recipient's RSA private
key to decrypt (or would need to have compromised a decrypting system).

-- Jim


On 6/2/16, 2:56 AM, "Tom Crayford"  wrote:

>Filesystem encryption is transparent to Kafka. You don't need to use SSL,
>but your encryption requirements may cause you to need SSL as well.
>
>With regards to compression, without adding at rest encryption to Kafka
>(which is a very major piece of work, one that for sure requires a KIP and
>has many, many implications), there's not much to do there. I think it's
>worth examining your threat models that require encryption on disk without
>full disk encryption being suitable. Generally compromised broker machines
>means an attacker will be able to sniff in flight traffic anyway, if the
>goal is to never leak messages even if an attacker has full control of the
>broker machine, I'd suggest that that seems pretty impossible under
>current
>operating environments.
>
>If the issue is compliance, I'd recommend querying whichever compliance
>standard you're operating under about the suitability of full disk
>encryption, and careful thought about encrypting the most sensitive parts
>of messages. Whilst encryption in the producer and consumer does lead to
>performance issues and decrease the capability of compression to shrink a
>dataset, doing partial encryption of messages is easy enough.
>
>Generally we've found that the kinds of uses of Kafka that require in
>message encryption (alongside full disk encryption and SSL which we
>provide
>as standard) don't have such high throughput needs that they worry about
>compression etc. That clearly isn't true for all use cases though.
>
>Thanks
>
>Tom Crayford
>Heroku Kafka
>
>On Thursday, 2 June 2016, Gerard Klijs  wrote:
>
>> You could add a header to every message, with information whether it's
>> encrypted or not, then you don't have to encrypt all the messages, or
>>you
>> only do it for some topics.
>>
>> On Thu, Jun 2, 2016 at 6:36 AM Bruno Rassaerts <
>> bruno.rassae...@novazone.be >
>> wrote:
>>
>> > It works indeed but encrypting individual messages really influences
>>the
>> > batch compression done by Kafka.
>> > Performance drops to about 1/3 of what it is without (even if we
>>prepare
>> > the encrypted samples upfront).
>> > In the end what we going for is only encrypting what we really really
>> need
>> > to encrypt, not every message systematically.
>> >
>> > > On 31 May 2016, at 13:00, Gerard Klijs > > wrote:
>> > >
>> > > If you want system administrators not being able to see the data,
>>the
>> > only
>> > > option is encryption, with only the clients sharing the key (or
>> whatever
>> > is
>> > > used to (de)crypt the data). Like the example from eugene. I don't
>>know
>> > the
>> > > kind of messages you have, but you could always wrap something
>>around
>> any
>> > > (de)serializer your currently using.
>> > >
>> > > On Tue, May 31, 2016 at 12:21 PM Bruno Rassaerts <
>> > > bruno.rassae...@novazone.be > wrote:
>> > >
>> > >> I’ve asked the same question in the past, and disk encryption was
>> > >> suggested as a solution as well.
>> > >> However, as far as I know, disk encryption will not prevent your
>>data
>> to
>> > >> be stolen when the machine is compromised.
>> > >> What we are looking for is even an additional barrier, so that even
>> > system
>> > >> administrators do not have access to the data.
>> > >> Any suggestions ?
>> > >>
>> > >>> On 24 May 2016, at 14:40, Tom Crayford > > wrote:
>> > >>>
>> > >>> Hi,
>> > >>>
>> > >>> There's no encryption at rest. It's recommended to use filesystem
>> > >>> encryption, or encryption of each individual message before
>>producing
>> > it
>> > >>> for this.
>> > >>>
>> > >>> Only the new producer and consumers have SSL support.
>> > >>>
>> > >>> Thanks
>> > >>>
>> > >>> Tom Crayford
>> > >>> Heroku Kafka
>> > >>>
>> > >>> On Tue, May 24, 2016 at 11:33 AM, Snehalata Nagaje <
>> > >>> snehalata.nag...@harbingergroup.com > wrote:
>> > >>>
>> > 
>> > 
>> >  Thanks for quick reply.
>> > 
>> >  Do you mean If I see messages in kafka, those will not be
>>readable?
>> > 
>> >  And also, we are using new producer but old consumer , does old

RE: Problematic messages in Kafka

2016-06-02 Thread Thakrar, Jayesh
Thanks for the quick reply Danny.

The message size as per the DumpLogSegments is around 59KB

I used a very high message.max.size and a high fetchsize of 1 MB (that's the 
message.max.size in the broker) and still the same hang behavior.
Also tried a max-wait-ms so that the consumer does not "hang" - but still the 
same result.

Here's what I used -

kafka-simple-consumer-shell.sh --broker-list $HOSTNAME:9092 --fetchsize 100 
--max-messages 10-max-wait-ms 1 --offset 7207844650  --partition 0 
--print-offsets --topic RtbBid --property message.max.size=100


-Original Message-
From: Danny Bahir [mailto:dannyba...@gmail.com] 
Sent: Thursday, June 02, 2016 10:06 PM
To: users@kafka.apache.org
Subject: Re: Problematic messages in Kafka

quoting from https://cwiki.apache.org/confluence/display/KAFKA/FAQ

The high-level consumer will block if
the next message available is larger than the maximum fetch size you have 
specified

   - One possibility of a stalled consumer is that the fetch size in the
   consumer is smaller than the largest message in the broker. You can use the
   DumpLogSegments tool to figure out the largest message size and set
   fetch.size in the consumer config accordingly.


On Thu, Jun 2, 2016 at 3:41 PM, Thakrar, Jayesh < jthak...@conversantmedia.com> 
wrote:

> Wondering if anyone has encountered similar issues.
>
> Using Kafka 0.8.2.1.
>
> Occasionally, we encounter a situation in which a consumer (including
> kafka-console-consumer.sh) just hangs.
> If I increment the offset to skip the offending message, things work 
> fine again.
>
> I have been able to identify the message offset and the data file (log
> file) containing the message.
>
> However, using kafka.tools.DumpLogSegments, I can dump the message 
> using commands like this -
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log 
> --deep-iteration
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log 
> --print-data-log --deep-iteration
>
> From the DumLogSegments program, here's the checksum info that I get -
> offset: 7207844652 position: 398291668 isvalid: true payloadsize: 
> 59041
> magic: 0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12
>
> So it looks like the message is ok, since there's also a CRC checksum.
> Has anyone encountered such an issue?
> Is there any explanation or reason for the broker behavior?
> I have the data/log file saved if there is any troubleshooting that 
> can be done.
>
> When the broker reads the message and it seems to hang forever, I have 
> to kill the console-consumer or our application consumer.
>
> When I do that, here's what I see in the broker's log file
>
> [2016-06-02 15:50:45,117] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:45,139] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 
> because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:724)
> [2016-06-02 15:50:49,936] INFO Closing socket connection to / 
> 10.110.105.134. (kafka.network.Processor)
> [2016-06-02 15:50:51,591] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:51,699] INFO Closing socket connection to / 
> 10.110.102.113. (kafka.network.Processor)
>
>
>
>
>
>
> This email and any files included with it may contain privileged, 
> proprietary and/or confidential information that is for the sole use 
> of the intended recipient(s).  Any disclosure, copying, distribution, 
> posting, or use of the information contained in or attached to this 
> email is prohibited unless permitted by the sender.  If you have 
> received this email in error, please immediately notify the sender via 
> return email, telephone, or fax and destroy this original transmission 
> and its included files without reading or saving it in any manner.
> Thank you.
>




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the infor

Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread Steve Tian
I see.  I'm not sure if this is a known issue.  Do you mind share the
brokers/topics setup and the steps to reproduce this issue?

Cheers, Steve

On Fri, Jun 3, 2016, 9:45 AM safique ahemad  wrote:

> you got it right...
>
> But DialTimeout is not a concern here. Client try fetching metadata from
> Kafka brokers but Kafka give them stale metadata near 30-40 sec.
> It try to fetch 3-4 time in between until it get updated metadata.
> This is completely different problem than
> https://github.com/Shopify/sarama/issues/661
>
>
>
> On Thu, Jun 2, 2016 at 6:05 PM, Steve Tian 
> wrote:
>
> > So you are coming from https://github.com/Shopify/sarama/issues/661 ,
> > right?   I'm not sure if anything from broker side can help but looks
> like
> > you already found DialTimeout on client side can help?
> >
> > Cheers, Steve
> >
> > On Fri, Jun 3, 2016, 8:33 AM safique ahemad 
> wrote:
> >
> > > kafka version:0.9.0.0
> > > go sarama client version: 1.8
> > >
> > > On Thu, Jun 2, 2016 at 5:14 PM, Steve Tian 
> > > wrote:
> > >
> > > > Client version?
> > > >
> > > > On Fri, Jun 3, 2016, 4:44 AM safique ahemad 
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > We are using Kafka broker cluster in our data center.
> > > > > Recently, It is realized that when a Kafka broker goes down then
> > client
> > > > try
> > > > > to refresh the metadata but it get stale metadata upto near 30
> > seconds.
> > > > >
> > > > > After near 30-35 seconds, updated metadata is obtained by client.
> > This
> > > is
> > > > > really a large time for the client continuously gets send failure
> for
> > > so
> > > > > long.
> > > > >
> > > > > Kindly, reply if any configuration may help here or something else
> or
> > > > > required.
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Regards,
> > > > > Safique Ahemad
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > Regards,
> > > Safique Ahemad
> > > GlobalLogic | Leaders in software R&D services
> > > P :+91 120 4342000-2990 | M:+91 9953533367
> > > www.globallogic.com
> > >
> >
>
>
>
> --
>
> Regards,
> Safique Ahemad
> GlobalLogic | Leaders in software R&D services
> P :+91 120 4342000-2990 | M:+91 9953533367
> www.globallogic.com
>


Re: Problematic messages in Kafka

2016-06-02 Thread Danny Bahir
quoting from https://cwiki.apache.org/confluence/display/KAFKA/FAQ

The high-level consumer will block if
the next message available is larger than the maximum fetch size you have
specified

   - One possibility of a stalled consumer is that the fetch size in the
   consumer is smaller than the largest message in the broker. You can use the
   DumpLogSegments tool to figure out the largest message size and set
   fetch.size in the consumer config accordingly.


On Thu, Jun 2, 2016 at 3:41 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Wondering if anyone has encountered similar issues.
>
> Using Kafka 0.8.2.1.
>
> Occasionally, we encounter a situation in which a consumer (including
> kafka-console-consumer.sh) just hangs.
> If I increment the offset to skip the offending message, things work fine
> again.
>
> I have been able to identify the message offset and the data file (log
> file) containing the message.
>
> However, using kafka.tools.DumpLogSegments, I can dump the message using
> commands like this -
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log
> --deep-iteration
>
> /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh
> kafka.tools.DumpLogSegments --files 007207840027.log
> --print-data-log --deep-iteration
>
> From the DumLogSegments program, here's the checksum info that I get -
> offset: 7207844652 position: 398291668 isvalid: true payloadsize: 59041
> magic: 0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12
>
> So it looks like the message is ok, since there's also a CRC checksum.
> Has anyone encountered such an issue?
> Is there any explanation or reason for the broker behavior?
> I have the data/log file saved if there is any troubleshooting that can be
> done.
>
> When the broker reads the message and it seems to hang forever, I have to
> kill the console-consumer or our application consumer.
>
> When I do that, here's what I see in the broker's log file
>
> [2016-06-02 15:50:45,117] INFO Closing socket connection to /
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:45,139] INFO Closing socket connection to /
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 because
> of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:724)
> [2016-06-02 15:50:49,936] INFO Closing socket connection to /
> 10.110.105.134. (kafka.network.Processor)
> [2016-06-02 15:50:51,591] INFO Closing socket connection to /
> 10.110.102.113. (kafka.network.Processor)
> [2016-06-02 15:50:51,699] INFO Closing socket connection to /
> 10.110.102.113. (kafka.network.Processor)
>
>
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>


Re: Dynamic bootstrap.servers with multiple data centers

2016-06-02 Thread Danny Bahir
Yes, I'm in.

Sent from my iPhone

> On Jun 2, 2016, at 8:32 AM, Ismael Juma  wrote:
> 
> Hi Danny,
> 
> A KIP has not been drafted for that yet. Would you be interested in working
> on it?
> 
> Ismael
> 
>> On Thu, Jun 2, 2016 at 1:15 PM, Danny Bahir  wrote:
>> 
>> Thanks Ben.
>> 
>> The comments on the Jira mention a pluggable component that will manage
>> the bootstrap list from a discovery service.
>> 
>> That's exactly what I need.
>> 
>> Was a Kip drafted for this enhancement?
>> 
>> -Danny
>> 
>>> On Jun 1, 2016, at 7:05 AM, Ben Stopford  wrote:
>>> 
>>> Hey Danny
>>> 
>>> Currently the bootstrap servers are only used when the client
>> initialises (there’s a bit of discussion around the issue in the jira below
>> if you’re interested). To implement failover you’d need to catch a timeout
>> exception in your client code, consulting your service discovery mechanism
>> and reinitialise the client.
>>> 
>>> KAFKA-3068 
>>> 
>>> B
>>> 
 On 31 May 2016, at 22:09, Danny Bahir  wrote:
 
 Hello,
 
 Working on a multi data center Kafka installation in which all clusters
>> have the same topics, the producers will be able to connect to any of the
>> clusters. Would like the ability to dynamically control the set of clusters
>> a producer will be able to connect to, that will allow to gracefully take a
>> cluster offline for maintenance.
 Current design is to have one zk cluster that is across all data
>> centers and will have info regarding what in which cluster a service is
>> available.
 
 In the case of Kafka it will house the info needed to populate
>> bootstrap.servers, a wrapper will be placed around the Kafka producer and
>> will watch this ZK value. When the value will change the producer instance
>> with the old value will be shut down and a new producer with the new
>> bootstrap.servers info will replace it.
 
 Is there a best practice for achieving this?
 
 Is there a way to dynamically update bootstrap.servers?
 
 Does the producer always go to the same machine from bootstrap.servers
>> when it refreshes the MetaData after metadata.max.age.ms has expired?
 
 Thanks!
>> 


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-02 Thread Christian Posta
Hate to bring up "non-flashy" technology... but Apache Camel would be a
great fit for something like this. Two java libraries each with very strong
suits.



On Thu, Jun 2, 2016 at 6:09 PM, Avi Flax  wrote:

> On 6/2/16, 07:03, "Eno Thereska"  wrote:
>
> > Using the low-level streams API you can definitely read or write to
> arbitrary
> > locations inside the process() method.
>
> Ah, good to know — thank you!
>
> > However, back to your original question: even with the low-level streams
> > API the sources and sinks can only be Kafka topics for now. So, as Gwen
> > mentioned, Connect would be the way to go to bring the data to a Kafka
> > Topic first.
>
> Got it — thank you!
>
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread safique ahemad
you got it right...

But DialTimeout is not a concern here. Client try fetching metadata from
Kafka brokers but Kafka give them stale metadata near 30-40 sec.
It try to fetch 3-4 time in between until it get updated metadata.
This is completely different problem than
https://github.com/Shopify/sarama/issues/661



On Thu, Jun 2, 2016 at 6:05 PM, Steve Tian  wrote:

> So you are coming from https://github.com/Shopify/sarama/issues/661 ,
> right?   I'm not sure if anything from broker side can help but looks like
> you already found DialTimeout on client side can help?
>
> Cheers, Steve
>
> On Fri, Jun 3, 2016, 8:33 AM safique ahemad  wrote:
>
> > kafka version:0.9.0.0
> > go sarama client version: 1.8
> >
> > On Thu, Jun 2, 2016 at 5:14 PM, Steve Tian 
> > wrote:
> >
> > > Client version?
> > >
> > > On Fri, Jun 3, 2016, 4:44 AM safique ahemad 
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We are using Kafka broker cluster in our data center.
> > > > Recently, It is realized that when a Kafka broker goes down then
> client
> > > try
> > > > to refresh the metadata but it get stale metadata upto near 30
> seconds.
> > > >
> > > > After near 30-35 seconds, updated metadata is obtained by client.
> This
> > is
> > > > really a large time for the client continuously gets send failure for
> > so
> > > > long.
> > > >
> > > > Kindly, reply if any configuration may help here or something else or
> > > > required.
> > > >
> > > >
> > > > --
> > > >
> > > > Regards,
> > > > Safique Ahemad
> > > >
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Safique Ahemad
> > GlobalLogic | Leaders in software R&D services
> > P :+91 120 4342000-2990 | M:+91 9953533367
> > www.globallogic.com
> >
>



-- 

Regards,
Safique Ahemad
GlobalLogic | Leaders in software R&D services
P :+91 120 4342000-2990 | M:+91 9953533367
www.globallogic.com


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-02 Thread Avi Flax
On 6/2/16, 07:03, "Eno Thereska"  wrote:

> Using the low-level streams API you can definitely read or write to arbitrary
> locations inside the process() method.

Ah, good to know — thank you!

> However, back to your original question: even with the low-level streams
> API the sources and sinks can only be Kafka topics for now. So, as Gwen
> mentioned, Connect would be the way to go to bring the data to a Kafka
> Topic first.

Got it — thank you!



Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread Steve Tian
So you are coming from https://github.com/Shopify/sarama/issues/661 ,
right?   I'm not sure if anything from broker side can help but looks like
you already found DialTimeout on client side can help?

Cheers, Steve

On Fri, Jun 3, 2016, 8:33 AM safique ahemad  wrote:

> kafka version:0.9.0.0
> go sarama client version: 1.8
>
> On Thu, Jun 2, 2016 at 5:14 PM, Steve Tian 
> wrote:
>
> > Client version?
> >
> > On Fri, Jun 3, 2016, 4:44 AM safique ahemad 
> wrote:
> >
> > > Hi All,
> > >
> > > We are using Kafka broker cluster in our data center.
> > > Recently, It is realized that when a Kafka broker goes down then client
> > try
> > > to refresh the metadata but it get stale metadata upto near 30 seconds.
> > >
> > > After near 30-35 seconds, updated metadata is obtained by client. This
> is
> > > really a large time for the client continuously gets send failure for
> so
> > > long.
> > >
> > > Kindly, reply if any configuration may help here or something else or
> > > required.
> > >
> > >
> > > --
> > >
> > > Regards,
> > > Safique Ahemad
> > >
> >
>
>
>
> --
>
> Regards,
> Safique Ahemad
> GlobalLogic | Leaders in software R&D services
> P :+91 120 4342000-2990 | M:+91 9953533367
> www.globallogic.com
>


Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread safique ahemad
kafka version:0.9.0.0
go sarama client version: 1.8

On Thu, Jun 2, 2016 at 5:14 PM, Steve Tian  wrote:

> Client version?
>
> On Fri, Jun 3, 2016, 4:44 AM safique ahemad  wrote:
>
> > Hi All,
> >
> > We are using Kafka broker cluster in our data center.
> > Recently, It is realized that when a Kafka broker goes down then client
> try
> > to refresh the metadata but it get stale metadata upto near 30 seconds.
> >
> > After near 30-35 seconds, updated metadata is obtained by client. This is
> > really a large time for the client continuously gets send failure for so
> > long.
> >
> > Kindly, reply if any configuration may help here or something else or
> > required.
> >
> >
> > --
> >
> > Regards,
> > Safique Ahemad
> >
>



-- 

Regards,
Safique Ahemad
GlobalLogic | Leaders in software R&D services
P :+91 120 4342000-2990 | M:+91 9953533367
www.globallogic.com


Re: Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread Steve Tian
Client version?

On Fri, Jun 3, 2016, 4:44 AM safique ahemad  wrote:

> Hi All,
>
> We are using Kafka broker cluster in our data center.
> Recently, It is realized that when a Kafka broker goes down then client try
> to refresh the metadata but it get stale metadata upto near 30 seconds.
>
> After near 30-35 seconds, updated metadata is obtained by client. This is
> really a large time for the client continuously gets send failure for so
> long.
>
> Kindly, reply if any configuration may help here or something else or
> required.
>
>
> --
>
> Regards,
> Safique Ahemad
>


Re: Unavailable Partitions and Uneven ISR

2016-06-02 Thread Russ Lavoie
Have you verified that the old leader of the partition is using the same
I'd as before?  Check in zk /brokers/ids to get a list of available
brokers.  I would use the reassignment tool to move partition 3 to brokers
in the list from zk (specifying only 3 brokers).  Make sure to include
broker with ID 1 since it was the only ISR in the list before restart.  I
would start there
On Jun 1, 2016 11:40 AM, "Tushar Agrawal"  wrote:

> Hi,
>
> We have 5 brokers running on 0.9.0.1 with 5 ZK. This morning, multiple
> topics were having "unavailable-partitions" (whose leader is not
> available). After looking at multiple logs, forums and google results, we
> finally restarted all the brokers one by one and issue seems to be
> resolved.
>
> However, for that particular partition now we have "five" ISR instead of
> "3".  What should we do to fix this issue?
>
>
> *Before restart*
>
> Topic:topic1 PartitionCount:8 ReplicationFactor:3 Configs:retention.ms
> =25920
> Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,3,4 Isr: 0,3,4
> Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,4,0 Isr: 0,1,4
> Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 1,2,0
> Topic: topic1 Partition: 3 Leader: -1 Replicas: 0,1,2,3,4 Isr: 1
> Topic: topic1 Partition: 4 Leader: 4 Replicas: 4,2,3 Isr: 4,3,2
> Topic: topic1 Partition: 5 Leader: 1 Replicas: 0,4,1 Isr: 1,4,0
> Topic: topic1 Partition: 6 Leader: 2 Replicas: 1,0,2 Isr: 2,0,1
> Topic: topic1 Partition: 7 Leader: 3 Replicas: 2,1,3 Isr: 3,2,1
>
> *After restart*
>
> Topic:topic1 PartitionCount:8 ReplicationFactor:3 Configs:retention.ms
> =25920
> Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,3,4 Isr: 0,3,4
> Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,4,0 Isr: 0,1,4
> Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
> Topic: topic1 Partition: 3 Leader: 0 Replicas: 0,1,2,3,4 Isr: 0,1,2,3,4
> Topic: topic1 Partition: 4 Leader: 4 Replicas: 4,2,3 Isr: 3,4,2
> Topic: topic1 Partition: 5 Leader: 0 Replicas: 0,4,1 Isr: 0,1,4
> Topic: topic1 Partition: 6 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2
> Topic: topic1 Partition: 7 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
>
> Thank you,
> Tushar
>


Re: broker randomly shuts down

2016-06-02 Thread Russ Lavoie
What about in dmesg?  I have run into this issue and it was the OOM
killer.  I also ran into a heap issue using too much of the direct memory
(JVM).  Reducing the fetcher threads helped with that problem.
On Jun 2, 2016 12:19 PM, "allen chan"  wrote:

> Hi Tom,
>
> That is one of the first things that i checked. Active memory never goes
> above 50% of overall available. File cache uses the rest of the memory but
> i do not think that causes OOM killer.
> Either way there is no entries in /var/log/messages (centos) to show OOM is
> happening.
>
> Thanks
>
> On Thu, Jun 2, 2016 at 5:36 AM, Tom Crayford  wrote:
>
> > That looks like somebody is killing the process. I'd suspect either the
> > linux OOM killer or something else automatically killing the JVM for some
> > reason.
> >
> > For the OOM killer, assuming you're on ubuntu, it's pretty easy to find
> in
> > /var/log/syslog (depending on your setup). I don't know about other
> > operating systems.
> >
> > On Thu, Jun 2, 2016 at 5:54 AM, allen chan  >
> > wrote:
> >
> > > I have an issue where my brokers would randomly shut itself down.
> > > I turned on debug in log4j.properties but still do not see a reason why
> > the
> > > shutdown is happening.
> > >
> > > Anyone seen this behavior before?
> > >
> > > version 0.10.0
> > > log4j.properties
> > > log4j.rootLogger=DEBUG, kafkaAppender
> > > * I tried TRACE level but i do not see any additional log messages
> > >
> > > snippet of log around shutdown
> > > [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > > Removed 0 expired offsets in 0 milliseconds.
> > > (kafka.coordinator.GroupMetadataManager)
> > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > > Removed 0 expired offsets in 0 milliseconds.
> > > (kafka.coordinator.GroupMetadataManager)
> > > [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > (kafka.server.KafkaServer)
> > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > (kafka.server.KafkaServer)
> > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > > shutdown (kafka.server.KafkaServer)
> > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > > shutdown (kafka.server.KafkaServer)
> > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > connections-closed:
> > > (org.apache.kafka.common.metrics.Metrics)
> > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > connections-created:
> > > (org.apache.kafka.common.metrics.Metrics)
> > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > bytes-sent-received:
> > > (org.apache.kafka.common.metrics.Metrics)
> > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name bytes-sent:
> > > (org.apache.kafka.common.metrics.Metrics)
> > > [2016-06-01 15:12:13,339] DEBUG Added sensor with name bytes-received:
> > > (org.apache.kafka.common.metrics.Metrics)
> > > [2016-06-01 15:12:13,339] DEBUG Added sensor with name select-time:
> > > (org.apache.kafka.common.metrics.Metrics)
> > >
> > > --
> > > Allen Michael Chan
> > >
> >
>
>
>
> --
> Allen Michael Chan
>


Kafka take too long to update the client with metadata when a broker is gone

2016-06-02 Thread safique ahemad
Hi All,

We are using Kafka broker cluster in our data center.
Recently, It is realized that when a Kafka broker goes down then client try
to refresh the metadata but it get stale metadata upto near 30 seconds.

After near 30-35 seconds, updated metadata is obtained by client. This is
really a large time for the client continuously gets send failure for so
long.

Kindly, reply if any configuration may help here or something else or
required.


-- 

Regards,
Safique Ahemad


Problematic messages in Kafka

2016-06-02 Thread Thakrar, Jayesh
Wondering if anyone has encountered similar issues.

Using Kafka 0.8.2.1.

Occasionally, we encounter a situation in which a consumer (including 
kafka-console-consumer.sh) just hangs.
If I increment the offset to skip the offending message, things work fine again.

I have been able to identify the message offset and the data file (log file) 
containing the message.

However, using kafka.tools.DumpLogSegments, I can dump the message using 
commands like this -

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 007207840027.log --deep-iteration

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments --files 007207840027.log --print-data-log 
--deep-iteration

>From the DumLogSegments program, here's the checksum info that I get -
offset: 7207844652 position: 398291668 isvalid: true payloadsize: 59041 magic: 
0 compresscodec: NoCompressionCodec crc: 186430976 keysize: 12

So it looks like the message is ok, since there's also a CRC checksum.
Has anyone encountered such an issue?
Is there any explanation or reason for the broker behavior?
I have the data/log file saved if there is any troubleshooting that can be done.

When the broker reads the message and it seems to hang forever, I have to kill 
the console-consumer or our application consumer.

When I do that, here's what I see in the broker's log file

[2016-06-02 15:50:45,117] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:45,139] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:49,142] ERROR Closing socket for /10.110.100.46 because of 
error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:724)
[2016-06-02 15:50:49,936] INFO Closing socket connection to /10.110.105.134. 
(kafka.network.Processor)
[2016-06-02 15:50:51,591] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)
[2016-06-02 15:50:51,699] INFO Closing socket connection to /10.110.102.113. 
(kafka.network.Processor)






This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


Re: Restoring Kafka data to one broker

2016-06-02 Thread Meghana Narasimhan
Hi All,
Any suggestions or inputs on this ? Any help would be greatly appreciated.

Thanks,
Meghana

On Wed, Jun 1, 2016 at 3:01 PM, Meghana Narasimhan <
mnarasim...@bandwidth.com> wrote:

> Hi,
> I have a 3 node cluster with kafka version 0.9.0.1 with many topics having
> replication factor 3 and isr 2.
> Node 0 is running multiple mirrormakers.
>
> Node 1 in this cluster has an issue and lost all data from disk (All kafka
> data logs got deleted from disk).
> Kafka broker is down but zookeeper is still running and its data folder is
> available on this node.
>
> Node 0 and 2 are still alive and processing data.
>
> -Any inputs or suggestions on the best way to restore Node B and replicate
> say 20TB of data from Node 0 and 2 ?
> -Will just restarting the Kafka server replicate data to this node from
> Node 0 and 2 ? (This seemed to work for a small volume of data, but
> wondering if this is the right approach to take for larger volume of data)
> -What are the best practices in terms of configuration parameters for
> broker and mirrormaker that need to be verified in this scenario ?
>
> Thanks,
> Meghana
>
>


Re: Kafka broker slow down when consumer try to fetch large messages from topic

2016-06-02 Thread Tom Crayford
The article says ideal is about 10KB, which holds up well with what we've
seen in practice as well.

On Thu, Jun 2, 2016 at 6:25 PM, prateek arora 
wrote:

> Hi
> Thanks for the information .
>
> I have one question :
>
> Right now in my scenario  maximum message size is around 800KB . did we
> consider these messages in large size categories , because article told
> about 10-100 MB data .
>
> Regards
> Prateek
>
>
>
>
>
>
> On Thu, Jun 2, 2016 at 6:54 AM, Tom Crayford  wrote:
>
> > Hi there,
> >
> > Firstly, a note that Kafka isn't really designed for this kind of large
> > message. http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> > covers a lot of tips around this use case however, and covers some tuning
> > that will likely improve your usage.
> >
> > In particular, I expect tuning up fetch.message.max.bytes on the consumer
> > to help out a lot here.
> >
> > Generally though, doing large messages will lead to very low throughput
> and
> > lots of stability issues, as noted in that article. We run thousands of
> > clusters in production, and typically recommend folk keep message sizes
> > down to the few tens of KB for most use cases.
> >
> > Thanks
> >
> > Tom Crayford
> > Heroku Kafka
> >
> > On Wed, Jun 1, 2016 at 9:49 PM, prateek arora <
> prateek.arora...@gmail.com>
> > wrote:
> >
> > > I have 4 node kafka broker with following configuration :
> > >
> > > Default Number of Partitions  : num.partitions : 1
> > > Default Replication Factor : default.replication.factor : 1
> > > Maximum Message Size : message.max.bytes : 10 MB
> > > Replica Maximum Fetch Size : replica.fetch.max.bytes : 10 MB
> > >
> > >
> > > Right now I have 4 topic with 1 partition and 1 replication factor .
> > >
> > > "Topic Name" : "Broker Id" :  "Total Messages Received Across Kafka
> > > Broker" : "Total Bytes Received Across Kafka Broker"
> > > Topic 1  - Leader Kafka Broker 1 :  4.67 Message/Second  :  1.6
> MB/second
> > > Topic 2  - Leader Kafka Broker 2 :  4.78 Message/Second  :  4.1
> MB/second
> > > Topic 3  - Leader Kafka Broker 1 :  4.83  Message/Second   : 1.6
> > MB/second
> > > Topic 4  - Leader Kafka Broker 3  : 4.8 Message/Second   :   4.3
> > MB/second
> > >
> > > Message consist of .
> > >
> > >
> > > when consumer tried to read message from "Topic 2"  Kafka Broker rate
> of
> > >  message receiving slow down from 4.77 message/second to 3.12
> > > message/second  , after some time  try to goes up .
> > >
> > > I also attached screenshot of "Total Messages Received Across Kafka
> > > Broker"  and "Total Bytes Received Across Kafka Broker" for topic
> "Topic
> > > 2" .
> > >
> > > can someone explain why it is happen and how to solve it ?
> > >
> > > Regards
> > > Prateek
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
>


Re: Kafka broker slow down when consumer try to fetch large messages from topic

2016-06-02 Thread prateek arora
Hi
Thanks for the information .

I have one question :

Right now in my scenario  maximum message size is around 800KB . did we
consider these messages in large size categories , because article told
about 10-100 MB data .

Regards
Prateek






On Thu, Jun 2, 2016 at 6:54 AM, Tom Crayford  wrote:

> Hi there,
>
> Firstly, a note that Kafka isn't really designed for this kind of large
> message. http://ingest.tips/2015/01/21/handling-large-messages-kafka/
> covers a lot of tips around this use case however, and covers some tuning
> that will likely improve your usage.
>
> In particular, I expect tuning up fetch.message.max.bytes on the consumer
> to help out a lot here.
>
> Generally though, doing large messages will lead to very low throughput and
> lots of stability issues, as noted in that article. We run thousands of
> clusters in production, and typically recommend folk keep message sizes
> down to the few tens of KB for most use cases.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Wed, Jun 1, 2016 at 9:49 PM, prateek arora 
> wrote:
>
> > I have 4 node kafka broker with following configuration :
> >
> > Default Number of Partitions  : num.partitions : 1
> > Default Replication Factor : default.replication.factor : 1
> > Maximum Message Size : message.max.bytes : 10 MB
> > Replica Maximum Fetch Size : replica.fetch.max.bytes : 10 MB
> >
> >
> > Right now I have 4 topic with 1 partition and 1 replication factor .
> >
> > "Topic Name" : "Broker Id" :  "Total Messages Received Across Kafka
> > Broker" : "Total Bytes Received Across Kafka Broker"
> > Topic 1  - Leader Kafka Broker 1 :  4.67 Message/Second  :  1.6 MB/second
> > Topic 2  - Leader Kafka Broker 2 :  4.78 Message/Second  :  4.1 MB/second
> > Topic 3  - Leader Kafka Broker 1 :  4.83  Message/Second   : 1.6
> MB/second
> > Topic 4  - Leader Kafka Broker 3  : 4.8 Message/Second   :   4.3
> MB/second
> >
> > Message consist of .
> >
> >
> > when consumer tried to read message from "Topic 2"  Kafka Broker rate of
> >  message receiving slow down from 4.77 message/second to 3.12
> > message/second  , after some time  try to goes up .
> >
> > I also attached screenshot of "Total Messages Received Across Kafka
> > Broker"  and "Total Bytes Received Across Kafka Broker" for topic "Topic
> > 2" .
> >
> > can someone explain why it is happen and how to solve it ?
> >
> > Regards
> > Prateek
> >
> >
> >
> >
> >
> >
> >
>


Re: broker randomly shuts down

2016-06-02 Thread allen chan
Hi Tom,

That is one of the first things that i checked. Active memory never goes
above 50% of overall available. File cache uses the rest of the memory but
i do not think that causes OOM killer.
Either way there is no entries in /var/log/messages (centos) to show OOM is
happening.

Thanks

On Thu, Jun 2, 2016 at 5:36 AM, Tom Crayford  wrote:

> That looks like somebody is killing the process. I'd suspect either the
> linux OOM killer or something else automatically killing the JVM for some
> reason.
>
> For the OOM killer, assuming you're on ubuntu, it's pretty easy to find in
> /var/log/syslog (depending on your setup). I don't know about other
> operating systems.
>
> On Thu, Jun 2, 2016 at 5:54 AM, allen chan 
> wrote:
>
> > I have an issue where my brokers would randomly shut itself down.
> > I turned on debug in log4j.properties but still do not see a reason why
> the
> > shutdown is happening.
> >
> > Anyone seen this behavior before?
> >
> > version 0.10.0
> > log4j.properties
> > log4j.rootLogger=DEBUG, kafkaAppender
> > * I tried TRACE level but i do not see any additional log messages
> >
> > snippet of log around shutdown
> > [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > Removed 0 expired offsets in 0 milliseconds.
> > (kafka.coordinator.GroupMetadataManager)
> > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > Removed 0 expired offsets in 0 milliseconds.
> > (kafka.coordinator.GroupMetadataManager)
> > [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > (kafka.server.KafkaServer)
> > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > (kafka.server.KafkaServer)
> > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > shutdown (kafka.server.KafkaServer)
> > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > shutdown (kafka.server.KafkaServer)
> > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> connections-closed:
> > (org.apache.kafka.common.metrics.Metrics)
> > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> connections-created:
> > (org.apache.kafka.common.metrics.Metrics)
> > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> bytes-sent-received:
> > (org.apache.kafka.common.metrics.Metrics)
> > [2016-06-01 15:12:13,338] DEBUG Added sensor with name bytes-sent:
> > (org.apache.kafka.common.metrics.Metrics)
> > [2016-06-01 15:12:13,339] DEBUG Added sensor with name bytes-received:
> > (org.apache.kafka.common.metrics.Metrics)
> > [2016-06-01 15:12:13,339] DEBUG Added sensor with name select-time:
> > (org.apache.kafka.common.metrics.Metrics)
> >
> > --
> > Allen Michael Chan
> >
>



-- 
Allen Michael Chan


Re: Create KTable from two topics

2016-06-02 Thread Srikanth
I did try approach 3 yesterday with the following sample data.

topic1:
127339  538433
131933  626026
128072  536012
128074  546262
*123507  517631*
128073  542361
128073  560608

topic2:
128074  100282
131933  100394
127339  100445
128073  100710
*123507  100226*

I joined these and printed the result
val KTable1 = kStreamBuilder.table(intSerde, intSerde, "topic1")
val KTable2 = kStreamBuilder.table(intSerde, intSerde, "topic2")
val metadataKTable = KTable1.join(KTable2, (user, loc) => (user, loc) )
metadataKTable.toStream().print()

In the output I see each key being output twice. Didn't understand why?

Started Streams Example.
*123507 , (517631,100226)*
127339 , (538433,100445)
128073 , (542361,100710)
131933 , (626026,100394)
128073 , (560608,100710)
128072 , null
128074 , (546262,100282)
128074 , (546262,100282)
128073 , (560608,100710)
*123507 , (517631,100226)*
131933 , (626026,100394)
127339 , (538433,100445)
Finished Streams Example.

If I store the joinedTable to an intermediate topic and read it back, I see
duplicate records too.
val metadataKTable = kStreamBuilder.table(intSerde, intSerde,
metadataTopicName)
metadataKTable.print()

Started Streams Example.
538433 , (100445<-null)
546262 , (100282<-null)
546262 , (100282<-null)
538433 , (100445<-null)
*517631 , (100226<-null)*
*517631 , (100226<-null)*
542361 , (100710<-null)
560608 , (100710<-null)
560608 , (100710<-null)
626026 , (100394<-null)
626026 , (100394<-null)
Finished Streams Example.

BTW, what is the strange "*<-null"* in KTable.print mean?

Srikanth

On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax 
wrote:

> Hi Srikanth,
>
> your third approach seems to be the best fit. It uses only one shuffle
> of the data (which you cannot prevent in any case).
>
> If you want to put everything into a single application, you could use a
> "dummy" custom aggregation to convert the KStream into a KTable instead
> of writing into a topic and reading it from a second application.
>
> val kTable = metadataKTable
>  .toStream()
>  .map((k,v) => new KeyValue(v._1, v._2))
>  .through("intermediate topic")
>  .aggregateByKey(...);
>
> The aggregate function just replaces the old value with the new value
> (ie, not really performing an aggregation).
>
> -Matthias
>
>
> On 06/01/2016 08:03 PM, Srikanth wrote:
> > Hello,
> >
> > How do I build a KTable from two topics such that key is in one topic and
> > value in other?
> >
> > Ex,
> > topic1 has a key called basekey and userId as value.
> > topic2 has same basekey and locationId as value
> >
> > topic1 = {"basekey":1,"userId":111}
> > topic1 = {"basekey":2,"userId":222}
> >
> > topic2 = {"basekey":1,"locId":888}
> > topic2 = {"basekey":2,"locId":999}
> >
> > I want to build a KTable with userId as key and locationId as value.
> > This KTable will be used to enrich a KStream that only has userId and
> needs
> > to be updated with locationId.
> >
> > val KTable1: KTable[Integer, Integer] =
> kStreamBuilder.table(intSerde,
> > intSerde, "topic1")  --> basekey is used as key
> > val KTable2: KTable[Integer, Integer] =
> kStreamBuilder.table(intSerde,
> > intSerde, "topic2")  --> basekey is used as key
> >
> > val metadataKTable: KTable[Integer, Integer] =
> >   KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
> >//.map((k,v) => (v._1, v._2) --> .map() is not supported
> on
> > KTable
> >
> > Problem is KTable doesn't have an API to update its key. It only has a
> > mapValue().
> > I guess since the key is used in underlying rocksDB, it isn't easy to
> > change the key.
> > I was exploring if I can pass it through() another topic using
> > custom StreamPartitioner.
> > That will let me partition using a field in value but still can't replace
> > the key.
> >
> >
> > Alternate one, is to join the KStream with topic1 to get "basekey". Then
> > join it again with topic2 to get locationId.
> > This will cause KStream to be shuffled twice.
> >
> >
> > Alternate two, is to have this logic as a separate topology. That will
> > write metadata to a topic.
> > val metadataKStream = metadataKTable.toStream()
> > .map((k,v) => new
> > KeyValue(v._1, v._2))
> > .to("intermediate topic")
> >
> > Another topology will read the stream topic and perform a join.
> > val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
> > topic")
> > val joinedKStream =  someKStream.join(kTable, ...)
> >
> > Any thoughts on what could be a good approach?
> >
> > Srikanth
> >
>
>


Re: Track progress of kafka stream job

2016-06-02 Thread Srikanth
Matthias,

"""bin/kafka-consumer-groups.sh --zookeeper localhost:2181/kafka10
--list""" output didn't show the group I used in streams app.
Also, AbstractTask.java had a commit() API. That made me wonder if offset
management was overridden too.

I'm trying out KafkaStreams for one new streaming app we are working on.
We'll most likely stick to DSL for that.
Does the DSL expose any stat or debug info? Or any way to access the
underlying Context?

Srikanth

On Thu, Jun 2, 2016 at 9:30 AM, Matthias J. Sax 
wrote:

> Hi Srikanth,
>
> I am not exactly sure if I understand your question correctly.
>
> One way to track the progress is to get the current record offset (you
> can obtain it in the low lever Processor API via the provided Context
> object).
>
> Otherwise, on commit, all writes to intermediate topics are flushed to
> Kafka and the source offsets get committed to Kafka, too.
>
> A KafkaStream application internally uses the standard high level Java
> KafkaConsumer (all instances of a single application belong to the same
> consumer group) and standard Java KafkaProducer.
>
> So you can use standard Kafka tools to access this information.
>
> Does this answer your question?
>
> -Matthias
>
> On 05/31/2016 09:10 PM, Srikanth wrote:
> > Hi,
> >
> > How can I track the progress of a kafka streaming job?
> > The only reference I see is "commit.interval.ms" which controls how
> often
> > offset is committed.
> > By default where is it committed and is there a tool to read it back? May
> > be something similar to bin/kafka-consumer-groups.sh.
> >
> > I'd like to look at details for source & intermediate topics too.
> >
> > Srikanth
> >
>
>


Re: Create KTable from two topics

2016-06-02 Thread Matthias J. Sax
I would not expect a performance difference.

-Matthias

On 06/02/2016 06:15 PM, Srikanth wrote:
> In terms of performance there is not going to be much difference to+table
> vs through+aggregateByKey rt?
> 
> Srikanth
> 
> 
> On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax 
> wrote:
> 
>> Hi Srikanth,
>>
>> your third approach seems to be the best fit. It uses only one shuffle
>> of the data (which you cannot prevent in any case).
>>
>> If you want to put everything into a single application, you could use a
>> "dummy" custom aggregation to convert the KStream into a KTable instead
>> of writing into a topic and reading it from a second application.
>>
>> val kTable = metadataKTable
>>  .toStream()
>>  .map((k,v) => new KeyValue(v._1, v._2))
>>  .through("intermediate topic")
>>  .aggregateByKey(...);
>>
>> The aggregate function just replaces the old value with the new value
>> (ie, not really performing an aggregation).
>>
>> -Matthias
>>
>>
>> On 06/01/2016 08:03 PM, Srikanth wrote:
>>> Hello,
>>>
>>> How do I build a KTable from two topics such that key is in one topic and
>>> value in other?
>>>
>>> Ex,
>>> topic1 has a key called basekey and userId as value.
>>> topic2 has same basekey and locationId as value
>>>
>>> topic1 = {"basekey":1,"userId":111}
>>> topic1 = {"basekey":2,"userId":222}
>>>
>>> topic2 = {"basekey":1,"locId":888}
>>> topic2 = {"basekey":2,"locId":999}
>>>
>>> I want to build a KTable with userId as key and locationId as value.
>>> This KTable will be used to enrich a KStream that only has userId and
>> needs
>>> to be updated with locationId.
>>>
>>> val KTable1: KTable[Integer, Integer] =
>> kStreamBuilder.table(intSerde,
>>> intSerde, "topic1")  --> basekey is used as key
>>> val KTable2: KTable[Integer, Integer] =
>> kStreamBuilder.table(intSerde,
>>> intSerde, "topic2")  --> basekey is used as key
>>>
>>> val metadataKTable: KTable[Integer, Integer] =
>>>   KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
>>>//.map((k,v) => (v._1, v._2) --> .map() is not supported
>> on
>>> KTable
>>>
>>> Problem is KTable doesn't have an API to update its key. It only has a
>>> mapValue().
>>> I guess since the key is used in underlying rocksDB, it isn't easy to
>>> change the key.
>>> I was exploring if I can pass it through() another topic using
>>> custom StreamPartitioner.
>>> That will let me partition using a field in value but still can't replace
>>> the key.
>>>
>>>
>>> Alternate one, is to join the KStream with topic1 to get "basekey". Then
>>> join it again with topic2 to get locationId.
>>> This will cause KStream to be shuffled twice.
>>>
>>>
>>> Alternate two, is to have this logic as a separate topology. That will
>>> write metadata to a topic.
>>> val metadataKStream = metadataKTable.toStream()
>>> .map((k,v) => new
>>> KeyValue(v._1, v._2))
>>> .to("intermediate topic")
>>>
>>> Another topology will read the stream topic and perform a join.
>>> val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
>>> topic")
>>> val joinedKStream =  someKStream.join(kTable, ...)
>>>
>>> Any thoughts on what could be a good approach?
>>>
>>> Srikanth
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Create KTable from two topics

2016-06-02 Thread Srikanth
In terms of performance there is not going to be much difference to+table
vs through+aggregateByKey rt?

Srikanth


On Thu, Jun 2, 2016 at 9:21 AM, Matthias J. Sax 
wrote:

> Hi Srikanth,
>
> your third approach seems to be the best fit. It uses only one shuffle
> of the data (which you cannot prevent in any case).
>
> If you want to put everything into a single application, you could use a
> "dummy" custom aggregation to convert the KStream into a KTable instead
> of writing into a topic and reading it from a second application.
>
> val kTable = metadataKTable
>  .toStream()
>  .map((k,v) => new KeyValue(v._1, v._2))
>  .through("intermediate topic")
>  .aggregateByKey(...);
>
> The aggregate function just replaces the old value with the new value
> (ie, not really performing an aggregation).
>
> -Matthias
>
>
> On 06/01/2016 08:03 PM, Srikanth wrote:
> > Hello,
> >
> > How do I build a KTable from two topics such that key is in one topic and
> > value in other?
> >
> > Ex,
> > topic1 has a key called basekey and userId as value.
> > topic2 has same basekey and locationId as value
> >
> > topic1 = {"basekey":1,"userId":111}
> > topic1 = {"basekey":2,"userId":222}
> >
> > topic2 = {"basekey":1,"locId":888}
> > topic2 = {"basekey":2,"locId":999}
> >
> > I want to build a KTable with userId as key and locationId as value.
> > This KTable will be used to enrich a KStream that only has userId and
> needs
> > to be updated with locationId.
> >
> > val KTable1: KTable[Integer, Integer] =
> kStreamBuilder.table(intSerde,
> > intSerde, "topic1")  --> basekey is used as key
> > val KTable2: KTable[Integer, Integer] =
> kStreamBuilder.table(intSerde,
> > intSerde, "topic2")  --> basekey is used as key
> >
> > val metadataKTable: KTable[Integer, Integer] =
> >   KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
> >//.map((k,v) => (v._1, v._2) --> .map() is not supported
> on
> > KTable
> >
> > Problem is KTable doesn't have an API to update its key. It only has a
> > mapValue().
> > I guess since the key is used in underlying rocksDB, it isn't easy to
> > change the key.
> > I was exploring if I can pass it through() another topic using
> > custom StreamPartitioner.
> > That will let me partition using a field in value but still can't replace
> > the key.
> >
> >
> > Alternate one, is to join the KStream with topic1 to get "basekey". Then
> > join it again with topic2 to get locationId.
> > This will cause KStream to be shuffled twice.
> >
> >
> > Alternate two, is to have this logic as a separate topology. That will
> > write metadata to a topic.
> > val metadataKStream = metadataKTable.toStream()
> > .map((k,v) => new
> > KeyValue(v._1, v._2))
> > .to("intermediate topic")
> >
> > Another topology will read the stream topic and perform a join.
> > val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
> > topic")
> > val joinedKStream =  someKStream.join(kTable, ...)
> >
> > Any thoughts on what could be a good approach?
> >
> > Srikanth
> >
>
>


RE: Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Tauzell, Dave
The RollingFileAppender is required to use in production.

-Dave


-Original Message-
From: Dustin Cote [mailto:dus...@confluent.io]
Sent: Thursday, June 02, 2016 9:51 AM
To: users@kafka.apache.org
Subject: Re: Changing default logger to RollingFileAppender (KAFKA-2394)

Just to clarify, do you mean you are using the RollingFileAppender in 
production, or the naming convention for DailyRollingFileAppender is required 
by your production systems?

On Thu, Jun 2, 2016 at 10:49 AM, Andrew Otto  wrote:

> +1, this is what Wikimedia uses in production.
>
> On Thu, Jun 2, 2016 at 10:38 AM, Tauzell, Dave <
> dave.tauz...@surescripts.com
> > wrote:
>
> > I haven't started using this in production but this is how I will
> > likely setup the logging as it is easier to manage.
> >
> > -Dave
> >
> > -Original Message-
> > From: Dustin Cote [mailto:dus...@confluent.io]
> > Sent: Thursday, June 02, 2016 9:33 AM
> > To: users@kafka.apache.org; d...@kafka.apache.org
> > Subject: Changing default logger to RollingFileAppender (KAFKA-2394)
> >
> > Hi all,
> >
> > I'm looking at changing the Kafka default logging setup to use the
> > RollingFileAppender instead of the DailyRollingFileAppender in an
> > effort
> to
> > accomplish two goals:
> > 1) Avoid filling up users' disks if the log files grow unexpectedly
> > 2) Move off the admittedly unreliable DailyRollingFileAppender
> >
> > I wanted to know if the community has any feedback around this
> > before moving forward.  The main drawback with going to the
> > RollingFileAppender
> is
> > that the log file names will no longer have timestamps, but instead
> > be of the form server.log, server.log.1, etc.  What users are
> > depending on the file name convention and would need to rollback the
> > log4j configuration should the default change in a later version?
> > What sort of feedback can those users provide to help us document this the 
> > right way?
> >
> > Thanks,
> >
> > --
> > Dustin Cote
> > confluent.io
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use
> > of the individual or entity to whom they are addressed. If you have
> > received
> this
> > e-mail in error, please notify the sender by reply e-mail
> > immediately and destroy all copies of the e-mail and any attachments.
> >
>



--
Dustin Cote
confluent.io
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: Avro deserialization

2016-06-02 Thread Rick Mangi
Thanks!


> On May 31, 2016, at 1:00 PM, Michael Noll  wrote:
> 
> FYI: I fixed the docs of schema registry (vProps -> props).
> 
> Best, Michael
> 
> 
> On Tue, May 31, 2016 at 2:05 AM, Rick Mangi  wrote:
> 
>> That was exactly the problem, I found the example here to be very helpful
>> -
>> https://github.com/confluentinc/examples/blob/master/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
>> 
>> IMHO it’s confusing having the same class names in different packages when
>> most people probably rely on an IDE to manage their imports.
>> 
>> Thanks!
>> 
>> Rick
>> 
>> 
>> On May 30, 2016, at 5:44 AM, Michael Noll  wrote:
>> 
>> Rick,
>> 
>> 
>> Is your code really importing the correct ConsumerConfig objects?
>> 
>> It should be:
>> 
>>   import kafka.consumer.ConsumerConfig;
>> 
>> If you are using your IDE's auto-import feature, you might however end up
>> with the following import, which will give you the "ConsumerConfig is not a
>> public class" compile error:
>> 
>>   import org.apache.kafka.clients.consumer.ConsumerConfig;
>> 
>> Lastly, it looks as if you need to update the following line as well:
>> 
>>   // Note that this constructs from props (j.u.Properties), not vProps
>> (VerifiableProperties)
>>   ConsumerConnector consumer =
>> kafka.consumer.Consumer.createJavaConsumerConnector(new
>> ConsumerConfig(props));
>> 
>> Let us know if this solves your error.  The CP 3.0.0 docs might need a fix
>> then (to change vProps to props).
>> 
>> Best,
>> Michael
>> 
>> 
>> 
>> On Sun, May 29, 2016 at 2:49 PM, Rick Mangi  wrote:
>> 
>> Hello all,
>> 
>> I’m trying to use the new schema registry to read avro encoded messages I
>> created with kafka connect as described here:
>> 
>> http://docs.confluent.io/3.0.0/schema-registry/docs/serializer-formatter.html
>> 
>> The example code is obviously not correct, but beyond the obvious, I can’t
>> seem to figure out how to register KafkaAvroDecoder with a consumer. The
>> example given
>> 
>> ConsumerConnector consumer =
>> kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(
>> vProps));
>> 
>> Is illegal, ConsumerConfig is a private class and can’t be instantiated.
>> It also seems that KafkaAvroDecoder does not implement Deserializer, and
>> thus can’t be used in the normal way deserializers are registered.
>> 
>> Has anyone gotten this stuff to work?
>> 
>> Thanks,
>> 
>> Rick
>> 
>> 
>> 
>> 
>> --
>> Best regards,
>> Michael Noll
>> 
>> 
>> 
>> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
>> Apache Kafka and Confluent Platform: www.confluent.io/download
>> *
>> 
>> 
>> 
> 
> 
> --
> Best regards,
> Michael Noll
> 
> 
> 
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> Apache Kafka and Confluent Platform: www.confluent.io/download
> *



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: How to use HDP kafka?

2016-06-02 Thread Igor Kravzov
I used Ambari automatic deployment and it does not require additional
configurations unless you need some changes.

On Wed, Jun 1, 2016 at 9:15 PM, Shaolu Xu  wrote:

> Hi All,
>
> I used the latest HDP 2.4 version.
> Did you do some configuration before used HDP? I searched a solution that
> is http://www.cnblogs.com/i2u9/p/ambari-kafka-multiip.html, but it not
> works for me.
>
> Attachment errorInfo and HDPConfig snapshot.
>
> Thanks in advance!
>
> Thanks,
> Nicole
>
>
> On Wed, Jun 1, 2016 at 8:44 PM, Igor Kravzov 
> wrote:
>
>> Hi,
>>
>> I am unable to see the images. But I use Kafka with HDP right now without
>> any problem.
>>
>> On Tue, May 31, 2016 at 9:33 PM, Shaolu Xu 
>> wrote:
>>
>> > Hi All,
>> >
>> >
>> > Anyone used HDP to run kafka, I used it and face a problem.The following
>> > is the error info:
>> >
>> > [image: Inline image 2]
>> >
>> >
>> > The following is my HDP configuration:
>> >
>> > [image: Inline image 1]
>> > Should I set some configuration on HDP.
>> >
>> >
>> > Thanks in advance.
>> >
>> >
>> > Thanks,
>> > Nicole
>> >
>>
>
>


Re: Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Dustin Cote
Just to clarify, do you mean you are using the RollingFileAppender in
production, or the naming convention for DailyRollingFileAppender is
required by your production systems?

On Thu, Jun 2, 2016 at 10:49 AM, Andrew Otto  wrote:

> +1, this is what Wikimedia uses in production.
>
> On Thu, Jun 2, 2016 at 10:38 AM, Tauzell, Dave <
> dave.tauz...@surescripts.com
> > wrote:
>
> > I haven't started using this in production but this is how I will likely
> > setup the logging as it is easier to manage.
> >
> > -Dave
> >
> > -Original Message-
> > From: Dustin Cote [mailto:dus...@confluent.io]
> > Sent: Thursday, June 02, 2016 9:33 AM
> > To: users@kafka.apache.org; d...@kafka.apache.org
> > Subject: Changing default logger to RollingFileAppender (KAFKA-2394)
> >
> > Hi all,
> >
> > I'm looking at changing the Kafka default logging setup to use the
> > RollingFileAppender instead of the DailyRollingFileAppender in an effort
> to
> > accomplish two goals:
> > 1) Avoid filling up users' disks if the log files grow unexpectedly
> > 2) Move off the admittedly unreliable DailyRollingFileAppender
> >
> > I wanted to know if the community has any feedback around this before
> > moving forward.  The main drawback with going to the RollingFileAppender
> is
> > that the log file names will no longer have timestamps, but instead be of
> > the form server.log, server.log.1, etc.  What users are depending on the
> > file name convention and would need to rollback the log4j configuration
> > should the default change in a later version?  What sort of feedback can
> > those users provide to help us document this the right way?
> >
> > Thanks,
> >
> > --
> > Dustin Cote
> > confluent.io
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of the
> > individual or entity to whom they are addressed. If you have received
> this
> > e-mail in error, please notify the sender by reply e-mail immediately and
> > destroy all copies of the e-mail and any attachments.
> >
>



-- 
Dustin Cote
confluent.io


Re: Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Andrew Otto
+1, this is what Wikimedia uses in production.

On Thu, Jun 2, 2016 at 10:38 AM, Tauzell, Dave  wrote:

> I haven't started using this in production but this is how I will likely
> setup the logging as it is easier to manage.
>
> -Dave
>
> -Original Message-
> From: Dustin Cote [mailto:dus...@confluent.io]
> Sent: Thursday, June 02, 2016 9:33 AM
> To: users@kafka.apache.org; d...@kafka.apache.org
> Subject: Changing default logger to RollingFileAppender (KAFKA-2394)
>
> Hi all,
>
> I'm looking at changing the Kafka default logging setup to use the
> RollingFileAppender instead of the DailyRollingFileAppender in an effort to
> accomplish two goals:
> 1) Avoid filling up users' disks if the log files grow unexpectedly
> 2) Move off the admittedly unreliable DailyRollingFileAppender
>
> I wanted to know if the community has any feedback around this before
> moving forward.  The main drawback with going to the RollingFileAppender is
> that the log file names will no longer have timestamps, but instead be of
> the form server.log, server.log.1, etc.  What users are depending on the
> file name convention and would need to rollback the log4j configuration
> should the default change in a later version?  What sort of feedback can
> those users provide to help us document this the right way?
>
> Thanks,
>
> --
> Dustin Cote
> confluent.io
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


RE: Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Tauzell, Dave
I haven't started using this in production but this is how I will likely setup 
the logging as it is easier to manage.

-Dave

-Original Message-
From: Dustin Cote [mailto:dus...@confluent.io]
Sent: Thursday, June 02, 2016 9:33 AM
To: users@kafka.apache.org; d...@kafka.apache.org
Subject: Changing default logger to RollingFileAppender (KAFKA-2394)

Hi all,

I'm looking at changing the Kafka default logging setup to use the 
RollingFileAppender instead of the DailyRollingFileAppender in an effort to 
accomplish two goals:
1) Avoid filling up users' disks if the log files grow unexpectedly
2) Move off the admittedly unreliable DailyRollingFileAppender

I wanted to know if the community has any feedback around this before moving 
forward.  The main drawback with going to the RollingFileAppender is that the 
log file names will no longer have timestamps, but instead be of the form 
server.log, server.log.1, etc.  What users are depending on the file name 
convention and would need to rollback the log4j configuration should the 
default change in a later version?  What sort of feedback can those users 
provide to help us document this the right way?

Thanks,

--
Dustin Cote
confluent.io
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Changing default logger to RollingFileAppender (KAFKA-2394)

2016-06-02 Thread Dustin Cote
Hi all,

I'm looking at changing the Kafka default logging setup to use the
RollingFileAppender instead of the DailyRollingFileAppender in an effort to
accomplish two goals:
1) Avoid filling up users' disks if the log files grow unexpectedly
2) Move off the admittedly unreliable DailyRollingFileAppender

I wanted to know if the community has any feedback around this before
moving forward.  The main drawback with going to the RollingFileAppender is
that the log file names will no longer have timestamps, but instead be of
the form server.log, server.log.1, etc.  What users are depending on the
file name convention and would need to rollback the log4j configuration
should the default change in a later version?  What sort of feedback can
those users provide to help us document this the right way?

Thanks,

-- 
Dustin Cote
confluent.io


Re: Dynamic bootstrap.servers with multiple data centers

2016-06-02 Thread Enrico Olivelli
I am on the same situation. I use zookeeper to publish kafka broker
endpoints for dynamic discovery.

Il Gio 2 Giu 2016 14:33 Ismael Juma  ha scritto:

> Hi Danny,
>
> A KIP has not been drafted for that yet. Would you be interested in working
> on it?
>
> Ismael
>
> On Thu, Jun 2, 2016 at 1:15 PM, Danny Bahir  wrote:
>
> > Thanks Ben.
> >
> > The comments on the Jira mention a pluggable component that will manage
> > the bootstrap list from a discovery service.
> >
> > That's exactly what I need.
> >
> > Was a Kip drafted for this enhancement?
> >
> > -Danny
> >
> > > On Jun 1, 2016, at 7:05 AM, Ben Stopford  wrote:
> > >
> > > Hey Danny
> > >
> > > Currently the bootstrap servers are only used when the client
> > initialises (there’s a bit of discussion around the issue in the jira
> below
> > if you’re interested). To implement failover you’d need to catch a
> timeout
> > exception in your client code, consulting your service discovery
> mechanism
> > and reinitialise the client.
> > >
> > > KAFKA-3068 
> > >
> > > B
> > >
> > >> On 31 May 2016, at 22:09, Danny Bahir  wrote:
> > >>
> > >> Hello,
> > >>
> > >> Working on a multi data center Kafka installation in which all
> clusters
> > have the same topics, the producers will be able to connect to any of the
> > clusters. Would like the ability to dynamically control the set of
> clusters
> > a producer will be able to connect to, that will allow to gracefully
> take a
> > cluster offline for maintenance.
> > >> Current design is to have one zk cluster that is across all data
> > centers and will have info regarding what in which cluster a service is
> > available.
> > >>
> > >> In the case of Kafka it will house the info needed to populate
> > bootstrap.servers, a wrapper will be placed around the Kafka producer and
> > will watch this ZK value. When the value will change the producer
> instance
> > with the old value will be shut down and a new producer with the new
> > bootstrap.servers info will replace it.
> > >>
> > >> Is there a best practice for achieving this?
> > >>
> > >> Is there a way to dynamically update bootstrap.servers?
> > >>
> > >> Does the producer always go to the same machine from bootstrap.servers
> > when it refreshes the MetaData after metadata.max.age.ms has expired?
> > >>
> > >> Thanks!
> > >
> >
>
-- 


-- Enrico Olivelli


Re: Kafka broker slow down when consumer try to fetch large messages from topic

2016-06-02 Thread Tom Crayford
Hi there,

Firstly, a note that Kafka isn't really designed for this kind of large
message. http://ingest.tips/2015/01/21/handling-large-messages-kafka/
covers a lot of tips around this use case however, and covers some tuning
that will likely improve your usage.

In particular, I expect tuning up fetch.message.max.bytes on the consumer
to help out a lot here.

Generally though, doing large messages will lead to very low throughput and
lots of stability issues, as noted in that article. We run thousands of
clusters in production, and typically recommend folk keep message sizes
down to the few tens of KB for most use cases.

Thanks

Tom Crayford
Heroku Kafka

On Wed, Jun 1, 2016 at 9:49 PM, prateek arora 
wrote:

> I have 4 node kafka broker with following configuration :
>
> Default Number of Partitions  : num.partitions : 1
> Default Replication Factor : default.replication.factor : 1
> Maximum Message Size : message.max.bytes : 10 MB
> Replica Maximum Fetch Size : replica.fetch.max.bytes : 10 MB
>
>
> Right now I have 4 topic with 1 partition and 1 replication factor .
>
> "Topic Name" : "Broker Id" :  "Total Messages Received Across Kafka
> Broker" : "Total Bytes Received Across Kafka Broker"
> Topic 1  - Leader Kafka Broker 1 :  4.67 Message/Second  :  1.6 MB/second
> Topic 2  - Leader Kafka Broker 2 :  4.78 Message/Second  :  4.1 MB/second
> Topic 3  - Leader Kafka Broker 1 :  4.83  Message/Second   : 1.6 MB/second
> Topic 4  - Leader Kafka Broker 3  : 4.8 Message/Second   :   4.3 MB/second
>
> Message consist of .
>
>
> when consumer tried to read message from "Topic 2"  Kafka Broker rate of
>  message receiving slow down from 4.77 message/second to 3.12
> message/second  , after some time  try to goes up .
>
> I also attached screenshot of "Total Messages Received Across Kafka
> Broker"  and "Total Bytes Received Across Kafka Broker" for topic "Topic
> 2" .
>
> can someone explain why it is happen and how to solve it ?
>
> Regards
> Prateek
>
>
>
>
>
>
>


Re: Change Topic Name

2016-06-02 Thread Todd Palino
With the caveat that I’ve never tried this before...

I don’t see a reason why this wouldn’t work. There’s no topic information
that’s encoded in the log segments, as far as I’m aware. And there’s no
information about offsets stored in Zookeeper. So in theory, you should be
able to shut down the entire Kafka cluster (this will require a full
shutdown), and copy the log segments from one directory to another.
Obviously, the partition to broker assignments may not be the same, so
you’ll either need to align them with a partition reassignment beforehand,
or make sure you put the right partitions on the right brokers.

-Todd


On Wed, Jun 1, 2016 at 12:02 PM, Vladimir Picka 
wrote:

> Does creating new topic with new name and the same settings as the
> original one and directly copying files from kafka log directory into the
> new topic folder work? It would be nice if it would. I don't know if there
> is anything in log file format info or Zoo info attached to original topic
> which would prevent it to work.
>
> Petr
>
> -Original Message-
> From: Todd Palino [mailto:tpal...@gmail.com]
> Sent: 1. června 2016 10:48
> To: users@kafka.apache.org
> Subject: Re: Change Topic Name
>
> There's no way to do that. If you're trying to maintain data, you'll need
> to read all the data from the existing topic and produce it to the new one.
>
> -Todd
>
> On Wednesday, June 1, 2016, Johannes Nachtwey <
> johannes.nachtweyatw...@gmail.com> wrote:
>
> > Hi guys,
> >
> > it´s possible to change an existing topic name?
> >
> > Thanks and best wishes
> > Johannes
> >
>
>
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>
>


-- 
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Best monitoring tool for Kafka in production

2016-06-02 Thread Gerard Klijs
Not that I have anything against paying for monitoring, or against
Confluent, but you will need your consumers to be using kafka 1.10 is you
want to make most out of the confluent solution. We currently are using
zabbix, it's free, and it has complete functionality in one product. It
does can be a but verbose, and the graphs don't look very fancy.

On Thu, Jun 2, 2016 at 1:16 PM Michael Noll  wrote:

> Hafsa,
>
> since you specifically asked about non-free Kafka monitoring options as
> well:  As of version 3.0.0, the Confluent Platform provides a commercial
> monitoring tool for Kafka called Confluent Control Center.  (Disclaimer: I
> work for Confluent.)
>
> Quoting from the product page at
> http://www.confluent.io/product/control-center:
>
> "Know where your messages are at every step between source and destination.
> Identify slow brokers, delivery failures, and sleuth the truth out of
> unexpected latency in your network. Confluent Control Center delivers
> end-to-end stream monitoring. Unlike other monitoring tools, this one is
> purpose-built for your Kafka environment. Instead of identifying the
> throughput in your data center or other “clocks and cables” types of
> monitors, it tracks messages."
>
> Best wishes,
> Michael
>
>
>
>
> On Wed, May 25, 2016 at 12:42 PM, Hafsa Asif 
> wrote:
>
> > Hello,
> >
> > What is the best monitoring tool for Kafka in production, preferable free
> > tool? If there is no free tool, then please mention non-free efficient
> > monitoring tools also.
> >
> > We are feeling so much problem without monitoring tool. Sometimes brokers
> > goes down or consumer is not working, we are not informed.
> >
> > Best Regards,
> > Hafsa
> >
>


Re: Track progress of kafka stream job

2016-06-02 Thread Matthias J. Sax
Hi Srikanth,

I am not exactly sure if I understand your question correctly.

One way to track the progress is to get the current record offset (you
can obtain it in the low lever Processor API via the provided Context
object).

Otherwise, on commit, all writes to intermediate topics are flushed to
Kafka and the source offsets get committed to Kafka, too.

A KafkaStream application internally uses the standard high level Java
KafkaConsumer (all instances of a single application belong to the same
consumer group) and standard Java KafkaProducer.

So you can use standard Kafka tools to access this information.

Does this answer your question?

-Matthias

On 05/31/2016 09:10 PM, Srikanth wrote:
> Hi,
> 
> How can I track the progress of a kafka streaming job?
> The only reference I see is "commit.interval.ms" which controls how often
> offset is committed.
> By default where is it committed and is there a tool to read it back? May
> be something similar to bin/kafka-consumer-groups.sh.
> 
> I'd like to look at details for source & intermediate topics too.
> 
> Srikanth
> 



signature.asc
Description: OpenPGP digital signature


Re: Create KTable from two topics

2016-06-02 Thread Matthias J. Sax
Hi Srikanth,

your third approach seems to be the best fit. It uses only one shuffle
of the data (which you cannot prevent in any case).

If you want to put everything into a single application, you could use a
"dummy" custom aggregation to convert the KStream into a KTable instead
of writing into a topic and reading it from a second application.

val kTable = metadataKTable
 .toStream()
 .map((k,v) => new KeyValue(v._1, v._2))
 .through("intermediate topic")
 .aggregateByKey(...);

The aggregate function just replaces the old value with the new value
(ie, not really performing an aggregation).

-Matthias


On 06/01/2016 08:03 PM, Srikanth wrote:
> Hello,
> 
> How do I build a KTable from two topics such that key is in one topic and
> value in other?
> 
> Ex,
> topic1 has a key called basekey and userId as value.
> topic2 has same basekey and locationId as value
> 
> topic1 = {"basekey":1,"userId":111}
> topic1 = {"basekey":2,"userId":222}
> 
> topic2 = {"basekey":1,"locId":888}
> topic2 = {"basekey":2,"locId":999}
> 
> I want to build a KTable with userId as key and locationId as value.
> This KTable will be used to enrich a KStream that only has userId and needs
> to be updated with locationId.
> 
> val KTable1: KTable[Integer, Integer] = kStreamBuilder.table(intSerde,
> intSerde, "topic1")  --> basekey is used as key
> val KTable2: KTable[Integer, Integer] = kStreamBuilder.table(intSerde,
> intSerde, "topic2")  --> basekey is used as key
> 
> val metadataKTable: KTable[Integer, Integer] =
>   KTable1.join(KTable2, (user:Integer, loc:Integer) => (user, loc) )
>//.map((k,v) => (v._1, v._2) --> .map() is not supported  on
> KTable
> 
> Problem is KTable doesn't have an API to update its key. It only has a
> mapValue().
> I guess since the key is used in underlying rocksDB, it isn't easy to
> change the key.
> I was exploring if I can pass it through() another topic using
> custom StreamPartitioner.
> That will let me partition using a field in value but still can't replace
> the key.
> 
> 
> Alternate one, is to join the KStream with topic1 to get "basekey". Then
> join it again with topic2 to get locationId.
> This will cause KStream to be shuffled twice.
> 
> 
> Alternate two, is to have this logic as a separate topology. That will
> write metadata to a topic.
> val metadataKStream = metadataKTable.toStream()
> .map((k,v) => new
> KeyValue(v._1, v._2))
> .to("intermediate topic")
> 
> Another topology will read the stream topic and perform a join.
> val kTable =  kStreamBuilder.table(intSerde, intSerde, "intermediate
> topic")
> val joinedKStream =  someKStream.join(kTable, ...)
> 
> Any thoughts on what could be a good approach?
> 
> Srikanth
> 



signature.asc
Description: OpenPGP digital signature


Re: broker randomly shuts down

2016-06-02 Thread Tom Crayford
That looks like somebody is killing the process. I'd suspect either the
linux OOM killer or something else automatically killing the JVM for some
reason.

For the OOM killer, assuming you're on ubuntu, it's pretty easy to find in
/var/log/syslog (depending on your setup). I don't know about other
operating systems.

On Thu, Jun 2, 2016 at 5:54 AM, allen chan 
wrote:

> I have an issue where my brokers would randomly shut itself down.
> I turned on debug in log4j.properties but still do not see a reason why the
> shutdown is happening.
>
> Anyone seen this behavior before?
>
> version 0.10.0
> log4j.properties
> log4j.rootLogger=DEBUG, kafkaAppender
> * I tried TRACE level but i do not see any additional log messages
>
> snippet of log around shutdown
> [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.GroupMetadataManager)
> [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.GroupMetadataManager)
> [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> (kafka.server.KafkaServer)
> [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> (kafka.server.KafkaServer)
> [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> shutdown (kafka.server.KafkaServer)
> [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> shutdown (kafka.server.KafkaServer)
> [2016-06-01 15:12:13,338] DEBUG Added sensor with name connections-closed:
> (org.apache.kafka.common.metrics.Metrics)
> [2016-06-01 15:12:13,338] DEBUG Added sensor with name connections-created:
> (org.apache.kafka.common.metrics.Metrics)
> [2016-06-01 15:12:13,338] DEBUG Added sensor with name bytes-sent-received:
> (org.apache.kafka.common.metrics.Metrics)
> [2016-06-01 15:12:13,338] DEBUG Added sensor with name bytes-sent:
> (org.apache.kafka.common.metrics.Metrics)
> [2016-06-01 15:12:13,339] DEBUG Added sensor with name bytes-received:
> (org.apache.kafka.common.metrics.Metrics)
> [2016-06-01 15:12:13,339] DEBUG Added sensor with name select-time:
> (org.apache.kafka.common.metrics.Metrics)
>
> --
> Allen Michael Chan
>


Re: Dynamic bootstrap.servers with multiple data centers

2016-06-02 Thread Ismael Juma
Hi Danny,

A KIP has not been drafted for that yet. Would you be interested in working
on it?

Ismael

On Thu, Jun 2, 2016 at 1:15 PM, Danny Bahir  wrote:

> Thanks Ben.
>
> The comments on the Jira mention a pluggable component that will manage
> the bootstrap list from a discovery service.
>
> That's exactly what I need.
>
> Was a Kip drafted for this enhancement?
>
> -Danny
>
> > On Jun 1, 2016, at 7:05 AM, Ben Stopford  wrote:
> >
> > Hey Danny
> >
> > Currently the bootstrap servers are only used when the client
> initialises (there’s a bit of discussion around the issue in the jira below
> if you’re interested). To implement failover you’d need to catch a timeout
> exception in your client code, consulting your service discovery mechanism
> and reinitialise the client.
> >
> > KAFKA-3068 
> >
> > B
> >
> >> On 31 May 2016, at 22:09, Danny Bahir  wrote:
> >>
> >> Hello,
> >>
> >> Working on a multi data center Kafka installation in which all clusters
> have the same topics, the producers will be able to connect to any of the
> clusters. Would like the ability to dynamically control the set of clusters
> a producer will be able to connect to, that will allow to gracefully take a
> cluster offline for maintenance.
> >> Current design is to have one zk cluster that is across all data
> centers and will have info regarding what in which cluster a service is
> available.
> >>
> >> In the case of Kafka it will house the info needed to populate
> bootstrap.servers, a wrapper will be placed around the Kafka producer and
> will watch this ZK value. When the value will change the producer instance
> with the old value will be shut down and a new producer with the new
> bootstrap.servers info will replace it.
> >>
> >> Is there a best practice for achieving this?
> >>
> >> Is there a way to dynamically update bootstrap.servers?
> >>
> >> Does the producer always go to the same machine from bootstrap.servers
> when it refreshes the MetaData after metadata.max.age.ms has expired?
> >>
> >> Thanks!
> >
>


Re: Dynamic bootstrap.servers with multiple data centers

2016-06-02 Thread Danny Bahir
Thanks Ben.

The comments on the Jira mention a pluggable component that will manage the 
bootstrap list from a discovery service. 

That's exactly what I need.

Was a Kip drafted for this enhancement?

-Danny

> On Jun 1, 2016, at 7:05 AM, Ben Stopford  wrote:
> 
> Hey Danny
> 
> Currently the bootstrap servers are only used when the client initialises 
> (there’s a bit of discussion around the issue in the jira below if you’re 
> interested). To implement failover you’d need to catch a timeout exception in 
> your client code, consulting your service discovery mechanism and 
> reinitialise the client. 
> 
> KAFKA-3068 
> 
> B
> 
>> On 31 May 2016, at 22:09, Danny Bahir  wrote:
>> 
>> Hello,
>> 
>> Working on a multi data center Kafka installation in which all clusters have 
>> the same topics, the producers will be able to connect to any of the 
>> clusters. Would like the ability to dynamically control the set of clusters 
>> a producer will be able to connect to, that will allow to gracefully take a 
>> cluster offline for maintenance.
>> Current design is to have one zk cluster that is across all data centers and 
>> will have info regarding what in which cluster a service is available.
>> 
>> In the case of Kafka it will house the info needed to populate 
>> bootstrap.servers, a wrapper will be placed around the Kafka producer and 
>> will watch this ZK value. When the value will change the producer instance 
>> with the old value will be shut down and a new producer with the new 
>> bootstrap.servers info will replace it.
>> 
>> Is there a best practice for achieving this?
>> 
>> Is there a way to dynamically update bootstrap.servers?
>> 
>> Does the producer always go to the same machine from bootstrap.servers when 
>> it refreshes the MetaData after metadata.max.age.ms has expired?
>> 
>> Thanks!
> 


Re: Best monitoring tool for Kafka in production

2016-06-02 Thread Michael Noll
Hafsa,

since you specifically asked about non-free Kafka monitoring options as
well:  As of version 3.0.0, the Confluent Platform provides a commercial
monitoring tool for Kafka called Confluent Control Center.  (Disclaimer: I
work for Confluent.)

Quoting from the product page at
http://www.confluent.io/product/control-center:

"Know where your messages are at every step between source and destination.
Identify slow brokers, delivery failures, and sleuth the truth out of
unexpected latency in your network. Confluent Control Center delivers
end-to-end stream monitoring. Unlike other monitoring tools, this one is
purpose-built for your Kafka environment. Instead of identifying the
throughput in your data center or other “clocks and cables” types of
monitors, it tracks messages."

Best wishes,
Michael




On Wed, May 25, 2016 at 12:42 PM, Hafsa Asif 
wrote:

> Hello,
>
> What is the best monitoring tool for Kafka in production, preferable free
> tool? If there is no free tool, then please mention non-free efficient
> monitoring tools also.
>
> We are feeling so much problem without monitoring tool. Sometimes brokers
> goes down or consumer is not working, we are not informed.
>
> Best Regards,
> Hafsa
>


Re: Does the Kafka Streams DSL support non-Kafka sources/sinks?

2016-06-02 Thread Eno Thereska
Hi Avi,

Using the low-level streams API you can definitely read or write to arbitrary 
locations inside the process() method.

However, back to your original question: even with the low-level streams API 
the sources and sinks can only be Kafka topics for now. So, as Gwen mentioned, 
Connect would be the way to go to bring the data to a Kafka Topic first.

Thanks
Eno

> On 2 Jun 2016, at 00:07, Avi Flax  wrote:
> 
> On 6/1/16, 18:59, "Gwen Shapira"  wrote:
> 
>> This would be Michael and Guozhang job to answer
> 
> I look forward to hearing from them ;)
> 
>> but I'd look at two options if I were you:
>> 
>> 1) If the connector you need exists (
>> http://www.confluent.io/product/connectors), then you just need to run it.
>> It is just a simple REST API (submit job, job status, etc), so I wouldn't
>> count it as "learning a framework".
> 
> Ah, I see. That’s good to know! Unfortunately in my case we have a 
> proprietary WebSockets API on one side and a proprietary database schema on 
> the other, so I’m doubtful this avenue would be applicable. :(
> 
>> 2) I believe I've seen people implement "writes to database" in
>> KafkaProcessor. Maybe try to google / search the mailing list? Guozhang and
>> Michael can probably add details.
> 
> Ah, very cool. I will do some searching. Thank you so much!
> 
> Avi
> 
> Software Architect @ Park Assist » We’re hiring!
> 



Re: Kafka encryption

2016-06-02 Thread Tom Crayford
Filesystem encryption is transparent to Kafka. You don't need to use SSL,
but your encryption requirements may cause you to need SSL as well.

With regards to compression, without adding at rest encryption to Kafka
(which is a very major piece of work, one that for sure requires a KIP and
has many, many implications), there's not much to do there. I think it's
worth examining your threat models that require encryption on disk without
full disk encryption being suitable. Generally compromised broker machines
means an attacker will be able to sniff in flight traffic anyway, if the
goal is to never leak messages even if an attacker has full control of the
broker machine, I'd suggest that that seems pretty impossible under current
operating environments.

If the issue is compliance, I'd recommend querying whichever compliance
standard you're operating under about the suitability of full disk
encryption, and careful thought about encrypting the most sensitive parts
of messages. Whilst encryption in the producer and consumer does lead to
performance issues and decrease the capability of compression to shrink a
dataset, doing partial encryption of messages is easy enough.

Generally we've found that the kinds of uses of Kafka that require in
message encryption (alongside full disk encryption and SSL which we provide
as standard) don't have such high throughput needs that they worry about
compression etc. That clearly isn't true for all use cases though.

Thanks

Tom Crayford
Heroku Kafka

On Thursday, 2 June 2016, Gerard Klijs  wrote:

> You could add a header to every message, with information whether it's
> encrypted or not, then you don't have to encrypt all the messages, or you
> only do it for some topics.
>
> On Thu, Jun 2, 2016 at 6:36 AM Bruno Rassaerts <
> bruno.rassae...@novazone.be >
> wrote:
>
> > It works indeed but encrypting individual messages really influences the
> > batch compression done by Kafka.
> > Performance drops to about 1/3 of what it is without (even if we prepare
> > the encrypted samples upfront).
> > In the end what we going for is only encrypting what we really really
> need
> > to encrypt, not every message systematically.
> >
> > > On 31 May 2016, at 13:00, Gerard Klijs  > wrote:
> > >
> > > If you want system administrators not being able to see the data, the
> > only
> > > option is encryption, with only the clients sharing the key (or
> whatever
> > is
> > > used to (de)crypt the data). Like the example from eugene. I don't know
> > the
> > > kind of messages you have, but you could always wrap something around
> any
> > > (de)serializer your currently using.
> > >
> > > On Tue, May 31, 2016 at 12:21 PM Bruno Rassaerts <
> > > bruno.rassae...@novazone.be > wrote:
> > >
> > >> I’ve asked the same question in the past, and disk encryption was
> > >> suggested as a solution as well.
> > >> However, as far as I know, disk encryption will not prevent your data
> to
> > >> be stolen when the machine is compromised.
> > >> What we are looking for is even an additional barrier, so that even
> > system
> > >> administrators do not have access to the data.
> > >> Any suggestions ?
> > >>
> > >>> On 24 May 2016, at 14:40, Tom Crayford  > wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> There's no encryption at rest. It's recommended to use filesystem
> > >>> encryption, or encryption of each individual message before producing
> > it
> > >>> for this.
> > >>>
> > >>> Only the new producer and consumers have SSL support.
> > >>>
> > >>> Thanks
> > >>>
> > >>> Tom Crayford
> > >>> Heroku Kafka
> > >>>
> > >>> On Tue, May 24, 2016 at 11:33 AM, Snehalata Nagaje <
> > >>> snehalata.nag...@harbingergroup.com > wrote:
> > >>>
> > 
> > 
> >  Thanks for quick reply.
> > 
> >  Do you mean If I see messages in kafka, those will not be readable?
> > 
> >  And also, we are using new producer but old consumer , does old
> > consumer
> >  have ssl support?
> > 
> >  As mentioned in document, its not there.
> > 
> > 
> >  Thanks,
> >  Snehalata
> > 
> >  - Original Message -
> >  From: "Mudit Kumar" >
> >  To: users@kafka.apache.org 
> >  Sent: Tuesday, May 24, 2016 3:53:26 PM
> >  Subject: Re: Kafka encryption
> > 
> >  Yes,it does that.What specifically you are looking for?
> > 
> > 
> > 
> > 
> >  On 5/24/16, 3:52 PM, "Snehalata Nagaje" <
> >  snehalata.nag...@harbingergroup.com > wrote:
> > 
> > > Hi All,
> > >
> > >
> > > We have requirement of encryption in kafka.
> > >
> > > As per docs, we can configure kafka with ssl, for secured
> > >> communication.
> > >
> > > But does kafka also stores data in encrypted format?
> > >
> > >
> > > Thanks,
> > > Snehalata
> > 
> > >>
> > >>
> >
> >
>


Re: ClosedChannelException when trying to read from remote Kafka in AWS

2016-06-02 Thread Mudit Kumar
Glad to hear that you issue is fixed now!




On 6/2/16, 2:11 PM, "Marco B."  wrote:

>Hi Mudit,
>
>Thanks a lot for your answer.
>
>However, today we have set "advertised.host.name" on each kafka instance to
>the specific IP address of each node. For example, by default kafka tries
>to read the machine's hostname, specifically ip-10-1-83-5.ec2.internal -
>now it's "10.1.83.5" (of course, we had to do this for each hostname).
>
>I hope that all these solutions will help others with the same issue.
>
>Thanks a lot for your support!
>
>Kind regards,
>Marco
>
>
>2016-06-02 5:40 GMT+02:00 Mudit Kumar :
>
>> I donot think you need public hostname.I have a similarsetup and its
>> perfectly fine.
>> What I would suggest you to change the hostname,make it persistent,and use
>> FQDN everywhere with /etc/hosts entry locally and on AWS machines.Your
>> problem will get fixed.
>>
>>
>>
>>
>> On 6/1/16, 8:54 PM, "Marco B."  wrote:
>>
>> >Hi Ben,
>> >
>> >Thanks for your answer. What if the instance does not have a public DNS
>> >hostname?
>> >These are all private nodes without public/elastic IP, therefore I don't
>> >know what to set.
>> >
>> >Marco
>> >
>> >2016-06-01 15:09 GMT+02:00 Ben Davison :
>> >
>> >> Hi Marco,
>> >>
>> >> We use the public DNS hostname that you can get from the AWS metadata
>> >> service.
>> >>
>> >> Thanks,
>> >>
>> >> Ben
>> >>
>> >> On Wed, Jun 1, 2016 at 1:54 PM, Marco B.  wrote:
>> >>
>> >> > Hello everyone,
>> >> >
>> >> > I am trying to setup a MirrorMaker between my company's local cluster
>> and
>> >> > another cluster in AWS to have replication over clusters. We have
>> setup a
>> >> > VPN between these two clusters, and as far as I can see, everything
>> works
>> >> > correctly, meaning that I can ping the nodes and telnet into them
>> without
>> >> > any issues.
>> >> >
>> >> > Now, when I run the following command in the local cluster to use a
>> >> > Zookeeper instance located in AWS (10.1.83.6:2181), in order to read
>> a
>> >> > topic "test"
>> >> >
>> >> > ~/kafka_2.11-0.8.2.2$ ./bin/kafka-console-consumer.sh --zookeeper
>> >> > 10.1.83.6:2181 --topic test --from-beginning
>> >> >
>> >> > A bunch of errors comes up:
>> >> >
>> >> > WARN Fetching topic metadata with correlation id 1 for topics
>> [Set(test)]
>> >> > from broker [id:2,host:ip-10-1-83-5.ec2.internal,port:9092] failed
>> >> > (kafka.client.ClientUtils$)
>> >> > java.nio.channels.ClosedChannelException
>> >> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>> >> > at
>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>> >> > at
>> >> >
>> >> >
>> >>
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>> >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>> >> > at
>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>> >> > at
>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>> >> > at
>> >> >
>> >> >
>> >>
>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>> >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>> >> > WARN Fetching topic metadata with correlation id 1 for topics
>> [Set(test)]
>> >> > from broker [id:3,host:ip-10-1-83-6.ec2.internal,port:9092] failed
>> >> > (kafka.client.ClientUtils$)
>> >> >
>> >> > As far as I know, this is due to the fact that Zookeeper has
>> registered
>> >> an
>> >> > IP/Port for each Kafka instance and these need to be consistent with
>> the
>> >> > producer configuration, as described here (
>> >> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ).
>> >> >
>> >> > I tried to search on the web, and some people were recommending to
>> change
>> >> > the setting "advertised.host.name" to be either the public IP address
>> >> > coming from AWS (we cannot) or a specific hostname. Now, considering
>> that
>> >> > we have a VPN between the clusters, the only choice left seems to be
>> the
>> >> > one setting the hostname.
>> >> >
>> >> > What should this value be? Is there anything else I need to know for
>> this
>> >> > kind of setup? Any suggestions?
>> >> >
>> >> > Thanks in advance.
>> >> >
>> >> > Kind regards,
>> >> > Marco
>> >> >
>> >>
>> >> --
>> >>
>> >>
>> >> This email, including attachments, is private and confidential. If you
>> have
>> >> received this email in error please notify the sender and delete it from
>> >> your system. Emails are not secure and may contain viruses. No liability
>> >> can be accepted for viruses that might be transferred by this email or
>> any
>> >> attachment. Any unauthorised copying of this message or unauthorised
>> >> distribution and publication of the information contained herein are
>> >> prohibited.
>> >>
>> >> 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
>> >> Registered in England and Wales. Registered No. 04843573.
>> >>
>>
>>



Re: ClosedChannelException when trying to read from remote Kafka in AWS

2016-06-02 Thread Marco B.
Hi Mudit,

Thanks a lot for your answer.

However, today we have set "advertised.host.name" on each kafka instance to
the specific IP address of each node. For example, by default kafka tries
to read the machine's hostname, specifically ip-10-1-83-5.ec2.internal -
now it's "10.1.83.5" (of course, we had to do this for each hostname).

I hope that all these solutions will help others with the same issue.

Thanks a lot for your support!

Kind regards,
Marco


2016-06-02 5:40 GMT+02:00 Mudit Kumar :

> I donot think you need public hostname.I have a similarsetup and its
> perfectly fine.
> What I would suggest you to change the hostname,make it persistent,and use
> FQDN everywhere with /etc/hosts entry locally and on AWS machines.Your
> problem will get fixed.
>
>
>
>
> On 6/1/16, 8:54 PM, "Marco B."  wrote:
>
> >Hi Ben,
> >
> >Thanks for your answer. What if the instance does not have a public DNS
> >hostname?
> >These are all private nodes without public/elastic IP, therefore I don't
> >know what to set.
> >
> >Marco
> >
> >2016-06-01 15:09 GMT+02:00 Ben Davison :
> >
> >> Hi Marco,
> >>
> >> We use the public DNS hostname that you can get from the AWS metadata
> >> service.
> >>
> >> Thanks,
> >>
> >> Ben
> >>
> >> On Wed, Jun 1, 2016 at 1:54 PM, Marco B.  wrote:
> >>
> >> > Hello everyone,
> >> >
> >> > I am trying to setup a MirrorMaker between my company's local cluster
> and
> >> > another cluster in AWS to have replication over clusters. We have
> setup a
> >> > VPN between these two clusters, and as far as I can see, everything
> works
> >> > correctly, meaning that I can ping the nodes and telnet into them
> without
> >> > any issues.
> >> >
> >> > Now, when I run the following command in the local cluster to use a
> >> > Zookeeper instance located in AWS (10.1.83.6:2181), in order to read
> a
> >> > topic "test"
> >> >
> >> > ~/kafka_2.11-0.8.2.2$ ./bin/kafka-console-consumer.sh --zookeeper
> >> > 10.1.83.6:2181 --topic test --from-beginning
> >> >
> >> > A bunch of errors comes up:
> >> >
> >> > WARN Fetching topic metadata with correlation id 1 for topics
> [Set(test)]
> >> > from broker [id:2,host:ip-10-1-83-5.ec2.internal,port:9092] failed
> >> > (kafka.client.ClientUtils$)
> >> > java.nio.channels.ClosedChannelException
> >> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> >> > at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >> > at
> >> >
> >> >
> >>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >> > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> >> > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> >> > at
> >> >
> >> >
> >>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> >> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >> > WARN Fetching topic metadata with correlation id 1 for topics
> [Set(test)]
> >> > from broker [id:3,host:ip-10-1-83-6.ec2.internal,port:9092] failed
> >> > (kafka.client.ClientUtils$)
> >> >
> >> > As far as I know, this is due to the fact that Zookeeper has
> registered
> >> an
> >> > IP/Port for each Kafka instance and these need to be consistent with
> the
> >> > producer configuration, as described here (
> >> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ).
> >> >
> >> > I tried to search on the web, and some people were recommending to
> change
> >> > the setting "advertised.host.name" to be either the public IP address
> >> > coming from AWS (we cannot) or a specific hostname. Now, considering
> that
> >> > we have a VPN between the clusters, the only choice left seems to be
> the
> >> > one setting the hostname.
> >> >
> >> > What should this value be? Is there anything else I need to know for
> this
> >> > kind of setup? Any suggestions?
> >> >
> >> > Thanks in advance.
> >> >
> >> > Kind regards,
> >> > Marco
> >> >
> >>
> >> --
> >>
> >>
> >> This email, including attachments, is private and confidential. If you
> have
> >> received this email in error please notify the sender and delete it from
> >> your system. Emails are not secure and may contain viruses. No liability
> >> can be accepted for viruses that might be transferred by this email or
> any
> >> attachment. Any unauthorised copying of this message or unauthorised
> >> distribution and publication of the information contained herein are
> >> prohibited.
> >>
> >> 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
> >> Registered in England and Wales. Registered No. 04843573.
> >>
>
>