Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
Ratha,

if you based your problematic code on the PipeDemo example, then you should
have these two lines in your code (which most probably you haven't changed):

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());

This configures your application to interpret (= encode/decode), by
default, the keys and values of any messages it reads from Kafka as
strings.  This works for the PipeDemo example because the keys and values
are actually strings.

In your application, however, you do:

   KStream kafkaPayloadStream =
builder.stream(sourceTopics);

This won't work, because `builder.stream()`, when calling it without
explicit serdes, will use the default serdes configured for your
application.  So `builder.stream(sourceTopics)` will give you
`KStream`, not `KStream`.  Also, you
can't just cast a String to KafkaPayload to "fix" the problem;  if you
attempt to do so you run into the ClassCastException that you reported
below.

What you need to do fix your problem is:

1. Provide a proper serde for `KafkaPayload`.  See
http://docs.confluent.io/current/streams/developer-guide.html#implementing-custom-serializers-deserializers-serdes.
There are also example implementations of such custom serdes at [1] and [2].

Once you have that, you can e.g. write:

final Serde stringSerde = Serdes.String(); // provided by Kafka
final Serde kafkaPayloadSerde = ...; // must be provided
by you!

2.  Call `builder.stream()` with explicit serdes to overrides the default
serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.

KStream kafkaPayloadStream =
builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);

That should do it.

Lastly, you must think about serialization also when calling `to()` or
`through()`:

kafkaPayloadStream.to(targetTopic);

If you haven't changed to default key and value serdes, then `to()` will
fail because it will by default (in your app configuration) interpret
message values still as strings rather than KafkaPayload.  To fix this you
should call:

kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);

You need to override the default serdes whenever the data must be written
with, well, non-default serdes.

I'd recommend reading
http://docs.confluent.io/current/streams/developer-guide.html#data-types-and-serialization
to better understand how this works.


Hope this helps,
Michael



[1]
http://docs.confluent.io/current/streams/developer-guide.html#available-serializers-deserializers-serdes
[2]
https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/confluent/examples/streams/utils




On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:

> I checked my target topic and I see few messages than the source topic. (If
> source topic have 5 messages, I see 2 messages in my target topic) What
> settings I need to do ?
>
> And, when I try to consume message from the target topic, I get ClassCast
> Exception.
>
> java.lang.ClassCastException: java.lang.String cannot be cast to
> xx.yy.core.kafkamodels.KafkaPayload;
>
> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>
>
> I Merge two topics like;
>
> * KStreamBuilder builder = new KStreamBuilder();*
>
> * KStream kafkaPayloadStream =
> builder.stream(sourceTopics);*
>
> * kafkaPayloadStream.to(targetTopic);*
>
> * streams = new KafkaStreams(builder, properties);*
>
> * streams.start();*
>
>
> Why do I see classcast exception when consuming the message?
>
>
> On 11 October 2016 at 15:19, Ratha v  wrote:
>
> > Hi all;
> > I have custom datatype defined (a pojo class).
> > I copy  messages from one topic to another topic.
> > I do not see any messages in my target topic.
> > This works fro string messages, but not for my custom message.
> > Waht might be the cause?
> > I followed this sample [1]
> > [1]
> > https://github.com/apache/kafka/blob/trunk/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/
> > pipe/PipeDemo.java
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Michael Noll
When I wrote:

"If you haven't changed to default key and value serdes, then `to()`
will fail because [...]"

it should have read:

"If you haven't changed the default key and value serdes, then `to()`
will fail because [...]"



On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll  wrote:

> Ratha,
>
> if you based your problematic code on the PipeDemo example, then you
> should have these two lines in your code (which most probably you haven't
> changed):
>
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>
> This configures your application to interpret (= encode/decode), by
> default, the keys and values of any messages it reads from Kafka as
> strings.  This works for the PipeDemo example because the keys and values
> are actually strings.
>
> In your application, however, you do:
>
>KStream kafkaPayloadStream =
> builder.stream(sourceTopics);
>
> This won't work, because `builder.stream()`, when calling it without
> explicit serdes, will use the default serdes configured for your
> application.  So `builder.stream(sourceTopics)` will give you
> `KStream`, not `KStream`.  Also, you
> can't just cast a String to KafkaPayload to "fix" the problem;  if you
> attempt to do so you run into the ClassCastException that you reported
> below.
>
> What you need to do fix your problem is:
>
> 1. Provide a proper serde for `KafkaPayload`.  See
> http://docs.confluent.io/current/streams/developer-
> guide.html#implementing-custom-serializers-deserializers-serdes.  There
> are also example implementations of such custom serdes at [1] and [2].
>
> Once you have that, you can e.g. write:
>
> final Serde stringSerde = Serdes.String(); // provided by Kafka
> final Serde kafkaPayloadSerde = ...; // must be provided
> by you!
>
> 2.  Call `builder.stream()` with explicit serdes to overrides the default
> serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.
>
> KStream kafkaPayloadStream =
> builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
> That should do it.
>
> Lastly, you must think about serialization also when calling `to()` or
> `through()`:
>
> kafkaPayloadStream.to(targetTopic);
>
> If you haven't changed to default key and value serdes, then `to()` will
> fail because it will by default (in your app configuration) interpret
> message values still as strings rather than KafkaPayload.  To fix this you
> should call:
>
> kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
>
> You need to override the default serdes whenever the data must be written
> with, well, non-default serdes.
>
> I'd recommend reading http://docs.confluent.io/current/streams/developer-
> guide.html#data-types-and-serialization to better understand how this
> works.
>
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/streams/developer-
> guide.html#available-serializers-deserializers-serdes
> [2] https://github.com/confluentinc/examples/tree/
> kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> confluent/examples/streams/utils
>
>
>
>
> On Tue, Oct 11, 2016 at 7:38 AM, Ratha v  wrote:
>
>> I checked my target topic and I see few messages than the source topic.
>> (If
>> source topic have 5 messages, I see 2 messages in my target topic) What
>> settings I need to do ?
>>
>> And, when I try to consume message from the target topic, I get ClassCast
>> Exception.
>>
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> xx.yy.core.kafkamodels.KafkaPayload;
>>
>> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>>
>>
>> I Merge two topics like;
>>
>> * KStreamBuilder builder = new KStreamBuilder();*
>>
>> * KStream kafkaPayloadStream =
>> builder.stream(sourceTopics);*
>>
>> * kafkaPayloadStream.to(targetTopic);*
>>
>> * streams = new KafkaStreams(builder, properties);*
>>
>> * streams.start();*
>>
>>
>> Why do I see classcast exception when consuming the message?
>>
>>
>> On 11 October 2016 at 15:19, Ratha v  wrote:
>>
>> > Hi all;
>> > I have custom datatype defined (a pojo class).
>> > I copy  messages from one topic to another topic.
>> > I do not see any messages in my target topic.
>> > This works fro string messages, but not for my custom message.
>> > Waht might be the cause?
>> > I followed this sample [1]
>> > [1]
>> > https://github.com/apache/kafka/blob/trunk/streams/
>> > examples/src/main/java/org/apache/kafka/streams/examples/
>> > pipe/PipeDemo.java
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>


Re: puncutuate() never called

2016-10-11 Thread Michael Noll
Thanks for the follow-up and the bug report, David.

We're taking a look at that.



On Mon, Oct 10, 2016 at 4:36 PM, David Garcia  wrote:

> Thx for the responses.  I was able to identify a bug in how the times are
> obtained (offsets resolved as unknown cause the issue):
>
> “Actually, I think the bug is more subtle.  What happens when a consumed
> topic stops receiving messages?  The smallest timestamp will always be the
> static timestamp of this topic.
>
> -David
>
> On 10/7/16, 5:03 PM, "David Garcia"  wrote:
>
> Ok I found the bug.  Basically, if there is an empty topic (in the
> list of topics being consumed), any partition-group with partitions from
> the topic will always return -1 as the smallest timestamp (see
> PartitionGroup.java).
>
> To reproduce, simply start a kstreams consumer with one or more empty
> topics.  Punctuate will never be called.
>
> -David ”
>
> On 10/10/16, 1:55 AM, "Michael Noll"  wrote:
>
> > We have run the application (and have confirmed data is being
> received)
> for over 30 mins…with a 60-second timer.
>
> Ok, so your app does receive data but punctuate() still isn't being
> called.
> :-(
>
>
> > So, do we need to just rebuild our cluster with bigger machines?
>
> That's worth trying out.  See
> http://www.confluent.io/blog/design-and-deployment-
> considerations-for-deploying-apache-kafka-on-aws/
> for some EC2 instance types recommendations.
>
> But I'd also suggest to look into the logs of (1) your application,
> (2) the
> log files of the Kafka broker(s), and (3) the log files of ZooKeeper
> to see
> whether you see anything suspicious?
>
> Sorry for not being able to provide more actionable feedback at this
> point.  Typically we have seen such issues only (but not exclusively)
> in
> cases where there have been problems in the environment in which your
> application is running and/or the environment of the Kafka clusters.
> Unfortunately these environment problems are a bit tricky to debug
> remotely
> via the mailing list.
>
> -Michael
>
>
>
>
>
> On Fri, Oct 7, 2016 at 8:11 PM, David Garcia 
> wrote:
>
> > Yeah, this is possible.  We have run the application (and have
> confirmed
> > data is being received) for over 30 mins…with a 60-second timer.
> So, do we
> > need to just rebuild our cluster with bigger machines?
> >
> > -David
> >
> > On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
> >
> > David,
> >
> > punctuate() is still data-driven at this point, even when you're
> using
> > the
> > WallClock timestamp extractor.
> >
> > To use an example: Imagine you have configured punctuate() to be
> run
> > every
> > 5 seconds.  If there's no data being received for a minute, then
> > punctuate
> > won't be called -- even though you probably would have expected
> this to
> > happen 12 times during this 1 minute.
> >
> > (FWIW, there's an ongoing discussion to improve punctuate(),
> part of
> > which
> > is motivated by the current behavior that arguably is not very
> > intuitive to
> > many users.)
> >
> > Could this be the problem you're seeing?  See also the related
> > discussion
> > at
> > http://stackoverflow.com/questions/39535201/kafka-problems-with-
> > timestampextractor
> > .
> >
> >
> >
> >
> >
> >
> > On Fri, Oct 7, 2016 at 6:07 PM, David Garcia <
> dav...@spiceworks.com>
> > wrote:
> >
> > > Hello, I’m sure this question has been asked many times.
> > > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> > m4.xlarges.  We
> > > have an application that needs to use the punctuate() function
> to do
> > some
> > > work on a regular interval.  We are using the WallClock
> extractor.
> > > Unfortunately, the method is never called.  I have checked the
> > > filedescriptor setting for both the user as well as the
> process, and
> > > everything seems to be fine.  Is this a known bug, or is there
> > something
> > > obvious I’m missing?
> > >
> > > One note, the application used to work on this cluster, but
> now it’s
> > not
> > > working.  Not really sure what is going on?
> > >
> > > -David
> > >
> >
> >
> >
>
>
>


Re: Support for Kafka

2016-10-11 Thread Michael Noll
Regarding the JVM, we recommend running the latest version of JDK 1.8 with
the G1 garbage collector:
http://docs.confluent.io/current/kafka/deployment.html#jvm

And yes, Kafka does run on Ubuntu 16.04, too.

(Confluent provides .deb packages [1] for Apache Kafka if you are looking
for these to install Kafka on Ubuntu.)

Hope this helps,
Michael



[1] http://docs.confluent.io/current/installation.html




On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:

> Hi Syed,
>
> Apache Kafka runs on a JVM. I think the question you should ask is -- which
> JVM does Apache Kafka require in production*? It doesn't really depend on
> anything on a specific Linux distribution.
>
> * ...and I don't have that answer ;-)
>
> Cheers,
> Jens
>
> On Wednesday, October 5, 2016, Syed Hussaini <
> syed.hussa...@theexchangelab.com> wrote:
>
> > Dear Kafka team.
> >
> > I am in the Implementation stage of Kafka cluster and looking to find
> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
> >
> >
> >
> > Would be great if you please let us know.
> >
> >
> >
> >
> >
> > [image: The Exchange Lab] 
> >
> > *Syed Hussaini*
> > Infrastructure Engineer
> >
> > 1 Neathouse Place
> > 5th Floor
> > London, England, SW1V 1LH
> >
> >
> > syed.hussa...@theexchangelab.com
> > 
> >
> > T 0203 701 3177
> >
> >
> > --
> >
> > Follow us on Twitter: @exchangelab  |
> Visit
> > us on LinkedIn: The Exchange Lab
> > 
> >
> >
> >
> >
> >
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
>  companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%
> 2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>  Twitter 
>


Re: Support for Kafka

2016-10-11 Thread Michael Noll
Actually, I wanted to include the following link for the JVM docs (the
information matches what's written in the earlier link I shared):
http://kafka.apache.org/documentation#java


On Tue, Oct 11, 2016 at 11:21 AM, Michael Noll  wrote:

> Regarding the JVM, we recommend running the latest version of JDK 1.8 with
> the G1 garbage collector:
> http://docs.confluent.io/current/kafka/deployment.html#jvm
>
> And yes, Kafka does run on Ubuntu 16.04, too.
>
> (Confluent provides .deb packages [1] for Apache Kafka if you are looking
> for these to install Kafka on Ubuntu.)
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/installation.html
>
>
>
>
> On Mon, Oct 10, 2016 at 1:38 PM, Jens Rantil  wrote:
>
>> Hi Syed,
>>
>> Apache Kafka runs on a JVM. I think the question you should ask is --
>> which
>> JVM does Apache Kafka require in production*? It doesn't really depend on
>> anything on a specific Linux distribution.
>>
>> * ...and I don't have that answer ;-)
>>
>> Cheers,
>> Jens
>>
>> On Wednesday, October 5, 2016, Syed Hussaini <
>> syed.hussa...@theexchangelab.com> wrote:
>>
>> > Dear Kafka team.
>> >
>> > I am in the Implementation stage of Kafka cluster and looking to
>> find
>> > out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
>> >
>> >
>> >
>> > Would be great if you please let us know.
>> >
>> >
>> >
>> >
>> >
>> > [image: The Exchange Lab] 
>> >
>> > *Syed Hussaini*
>> > Infrastructure Engineer
>> >
>> > 1 Neathouse Place
>> > 5th Floor
>> > London, England, SW1V 1LH
>> >
>> >
>> > syed.hussa...@theexchangelab.com
>> > 
>> >
>> > T 0203 701 3177
>> >
>> >
>> > --
>> >
>> > Follow us on Twitter: @exchangelab  |
>> Visit
>> > us on LinkedIn: The Exchange Lab
>> > 
>> >
>> >
>> >
>> >
>> >
>>
>>
>> --
>> Jens Rantil
>> Backend engineer
>> Tink AB
>>
>> Email: jens.ran...@tink.se
>> Phone: +46 708 84 18 32
>> Web: www.tink.se
>>
>> Facebook  Linkedin
>> > res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVS
>> RPtargetId%3A2735919%2CVSRPcmpt%3Aprimary>
>>  Twitter 
>>
>
>
>
>


Re: Tuning for high RAM and 10GBe

2016-10-11 Thread Eno Thereska
Sounds good that you got up to 500MB/s. At that point I suspect you reach a 
sort of steady state where the cache is continuously flushing to the SSDs, so 
you are effectively bottlenecked by the SSD. I believe this is as expected (the 
bottleneck resource will dominate the end to end throughput even if you have a 
memory buffer to temporarily hold produce records).

If you want to be completely in-memory, that's ok for experimental purposes 
(e.g, using a RAM-based file system) but you'll then have to start worrying 
about failures if you are in production. However, if you are interested in just 
exploring perf for now, using an in-memory file system will be fine. You could 
even try having 3-way replication, all in memory (with 3 brokers). Again, note 
of caution: a correlated power failure can lose all your replicated in-memory 
data, so in production even if you replicate in memory you'd still want to 
write somewhere persistent. Unless your memory is battery-backed, but that's a 
longer story.

Cheers
Eno

> On 11 Oct 2016, at 03:49, Christopher Stelly  wrote:
> 
> With that link I came across the producer-perf-test tool, quite useful as
> it gets rid of the Go (Sarama) variable. Since it can quickly tweak
> settings, it's extremely useful.
> 
> As you suggested Eno, I attempted to copy the LinkedIn settings. With 100
> byte records, I get up to about 600,000 records/second. Still not quite
> what they were able to do with cheap hardware.
> 
> As far as throughput, I tend to max out at around 200MB/s on a single
> producer (record size 1, no acks, linger 100, batch size 100 and
> with compression), along with a generous HEAP_OPT env setting of 50G.
> 
> If I do the above settings with 4 producers, things start to slow down. It
> seems to add up to around 500MB/s, which is about what the SSD can write
> at.
> 
> Could this number be improved if I let the memory take care of this instead
> of flushing to disk? I understand that Kafka likes to flush often, but even
> relaxing broker's flush settings I can't seem to make an impact on this
> 500MB/s number (w/ 4 producers). After all, we have a lot of RAM to play
> with. The hackish solution would be to make a tempfs mount and store
> kafka-logs there, but that seems like the wrong approach. Any thoughts? Do
> you think flushing is my hold up at this point?
> 
> Again, thanks!
> 
> On Mon, Oct 10, 2016 at 12:45 PM, Christopher Stelly 
> wrote:
> 
>> Sure, good ideas. I'll try multiple producers, localhost and LAN, to see
>> if any difference
>> 
>> Yep, Gwen, the Sarama client. Anything to worry about there outside of
>> setting the producer configs (which would you set?) and number of buffered
>> channels? (currently, buffered channels up to 10k).
>> 
>> Thanks!
>> 
>> On Mon, Oct 10, 2016 at 12:04 PM, Gwen Shapira  wrote:
>> 
>>> Out of curiosity - what is "Golang's Kafka interface"? Are you
>>> referring to Sarama client?
>>> 
>>> On Sun, Oct 9, 2016 at 9:28 AM, Christopher Stelly 
>>> wrote:
 Hello,
 
 The last thread available regarding 10GBe is about 2 years old, with no
 obvious recommendations on tuning.
 
 Is there a more complex tuning guide than the example production config
 available on Kafka's main site? Anything other than the list of possible
 configs?
 
 I currently have access to a rather substantial academic cluster to test
 on, including multiple machines with the following hardware:
 
 10GBe NICs
 250GB RAM each
 SSDs on each
 (also, optional access to single NVMe)
 
 Using Golang's Kafka interface, I can only seem to get about 80MB/s on
>>> the
 producer pushing to logs on the localhost, using no replication and
>>> reading
 from/logging to SSD. If it helps, I can post my configs. I've tried
 fiddling with a bunch of broker configs as well as producer configs,
 raising the memory limits, max message size, io&network threads etc.
 
 Since the last post from 2014 indicates that there is no public
 benchmarking for 10GBe, I'd be happy to run benchmarks /publish results
>>> on
 this hardware if we can get it tuned up properly.
 
 What kind of broker/producer/consumer settings would you recommend?
 
 Thanks!
 - chris
>>> 
>>> 
>>> 
>>> --
>>> Gwen Shapira
>>> Product Manager | Confluent
>>> 650.450.2760 | @gwenshap
>>> Follow us: Twitter | blog
>>> 
>> 
>> 



Can broker recover to alive when zk callback miss?

2016-10-11 Thread 涂扬
hi,
we meet a issue that the temporary node of broker in zookeeper was lost 
when the network bewteen broker and zk cluster is not good enough, while the 
process of the broker still exist. as we know, the controller would consider it 
to be offline in kafka. After we open zkClient log, we can find when the 
connection state between broker and zk cluster is changed from disconnected to 
connected, but the newSession callback is not called.so this
broker can not recover to alive except restart.
So we decide to add a heartbeat mechanism in the application layer  
between client and broker that distinguish from zkclient heartbeat.  Can we 
immediately register this broker to zk when we detect broker temporary node is 
not in zk path. or how can we solve it?
The main problem is that the watch callback has the possibility of 
miss, how can we solve it?
Thanks.


broker upgrade

2016-10-11 Thread David Garcia
Hello, we are going to be upgrading the instance types of our brokers.  We will 
shut them down, upgrade, and the restart them.  All told, they will be down for 
about 15 minutes.  Upon restart, is there anything we need to do other than run 
preferred leader election?  The brokers will start to catch up on their own 
right?  How long should it take for them to catch up (in terms of how long they 
are down).  I would assume it would be less than 15 minutes.

Any tips are appreciated.  I’ve been looking at this: 
http://kafka.apache.org/documentation.html#basic_ops_restarting

-David


JVM crash when closing persistent store (rocksDB)

2016-10-11 Thread Pierre Coquentin
Hi,

I have a simple test where I create a topology builder with one topic, one
processor using a persistent store, then I create a kafka streams, start
it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
flushing the data. Anyone has already met this kind of problem ?

OS:
Ubuntu 16.04

JVM:
openjdk version "1.8.0_91"
OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.04.1-b14)
OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)

Kafka:
kafka_2.11-0.10.0.1

The java code to reproduce the problem:

public static void main(String[] args) throws InterruptedException {
Map configs = new HashMap<>();
configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
"localhost:2181");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
StringSerde.class.getName());
configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
StringSerde.class.getName());

TopologyBuilder builder = new TopologyBuilder()
.addSource("source", "test")
.addProcessor("processor", () -> new Processor() {

private KeyValueStore kvStore;

public void init(ProcessorContext context) {
kvStore = (KeyValueStore)
context.getStateStore("store");
}

public void process(String key, String value) {

}

public void close() {
kvStore.close();
}

@Override
public void punctuate(long timestamp) {


}
}, "source")
  .addStateStore(Stores.create("store").withKeys(new
StringSerde()).withValues(new StringSerde()).persistent().build(),
"processor");


KafkaStreams streams = new KafkaStreams(builder, new
StreamsConfig(configs));
streams.start();
TimeUnit.SECONDS.sleep(20);
streams.close();
}


There is the log:


11.10.2016 17:27:11 [main] INFO
 [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
replication.factor = 1
num.standby.replicas = 0
metric.reporters = []
commit.interval.ms = 3
bootstrap.servers = [localhost:9092]
state.dir = /tmp/kafka-streams
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
state.cleanup.delay.ms = 6
poll.ms = 100
zookeeper.connect = localhost:2181
key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metrics.sample.window.ms = 3
buffered.records.per.partition = 1000
value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor = class
org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
num.stream.threads = 1
metrics.num.samples = 2
application.id = test
client.id =

11.10.2016 17:27:11 [main] INFO
 [org.apache.kafka.streams.processor.internals.StreamThread:170] Creating
producer client for stream thread [StreamThread-1]
11.10.2016 17:27:11 [main] INFO
 [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
values:
metric.reporters = []
metadata.max.age.ms = 30
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 6
interceptor.classes = null
ssl.truststore.password = null
client.id = test-1-StreamThread-1-producer
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 3
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 6
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 100

11.10.2016 17:27:11 [main] INFO
 [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
values:
metric.reporters = []
metadata.max.age.ms = 30
reconnect.

Training Kafka and ZooKeeper - Monitoring and Operability

2016-10-11 Thread Nicolas Motte
Hi everyone,

I created a training for Application Management and OPS teams in my company.
Some sections are specific to our deployment, but most of them are generic
and explain how Kafka and ZooKeeper work.

I uploaded it on SlideShare, I thought it might be useful to other people:
http://fr.slideshare.net/NicolasMotte/training-kafka-and-zookeeper-monitoring-and-operability

In the description you will get a link to the version with audio
description.

Cheers
Nico


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-11 Thread Guozhang Wang
Ali,

We are working on moving from Java7 to Java8 in Apache Kafka, and the
Streams client is one of the motivations doing so. Stay tuned on the
mailing list when it will come.

Currently Streams won't automatically filter out null values for you since
in some other cases they may have semantic meanings and cannot be simply
ignored; you can, though, apply a simple filter such like "filter((key,
value) => value != null)" before your processor lambda operator, if it
looks clearer in your code.

Guozhang


On Sun, Oct 9, 2016 at 3:14 PM, Ali Akhtar  wrote:

> It isn't a fatal error. It should be logged as a warning, and then the
> stream should be continued w/ the next message.
>
> Checking for null is 'ok', in the sense that it gets the job done, but
> after java 8's release, we really should be using optionals.
>
> Hopefully we can break compatibility w/ the bad old days soon and move into
> the future.
>
> (If there's a way to do the null check automatically, i.e before calling
> the lambda, please let me know).
>
> On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang  wrote:
>
> > Ali,
> >
> > In your scenario, if serde fails to parse the bytes should that be
> treated
> > as a fatal failure or it is expected?
> >
> > In the former case, instead of returning a null I think it is better to
> > throw a runtime exception in order to let the whole client to stop and
> > notify the error; in the latter case, returning and checking null looks
> > fine to me.
> >
> >
> > Guozhang
> >
> > On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar  wrote:
> >
> > > Hey G,
> > >
> > > Looks like the only difference is a valueSerde parameter.
> > >
> > > How does that prevent having to look for nulls in the consumer?
> > >
> > > E.g, I wrote a custom Serde which converts the messages (which are json
> > > strings) into a Java class using Jackson.
> > >
> > > If the json parse fails, it sends back a null.
> > >
> > > When I'm reading this stream, in my callback, how would I prevent
> having
> > to
> > > check if the serialized value isn't null?
> > >
> > > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Ali,
> > > >
> > > > We do have corresponding overloaded functions for most of KStream /
> > > KTable
> > > > operators to avoid enforcing users to specify "null"; in these cases
> > the
> > > > default serdes specified in the configs are then used. For example:
> > > >
> > > >  KTable aggregate(Initializer initializer,
> > > >Aggregator adder,
> > > >Aggregator subtractor,
> > > >Serde aggValueSerde,
> > > >String storeName);
> > > >
> > > > /**
> > > >  * .. using default serializers and deserializers.
> > > >  */
> > > >  KTable aggregate(Initializer initializer,
> > > >Aggregator adder,
> > > >Aggregator subtractor,
> > > >String storeName);
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll 
> > > wrote:
> > > >
> > > > > Ali, the Apache Kafka project still targets Java 7, which means we
> > > can't
> > > > > use Java 8 features just yet.
> > > > >
> > > > > FYI: There's on ongoing conversation about when Kafka would move
> from
> > > > Java
> > > > > 7 to Java 8.
> > > > >
> > > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar 
> > > wrote:
> > > > >
> > > > > > Since we're using Java 8 in most cases anyway, Serdes /
> > Serialiazers
> > > > > should
> > > > > > use options, to avoid having to deal with the lovely nulls.
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: JVM crash when closing persistent store (rocksDB)

2016-10-11 Thread Eno Thereska
Hi Pierre,

I tried the exact code on MacOs and am not getting any errors. Could you check 
if all the directories in /tmp where Kafka Streams writes the RocksDb files are 
empty? I'm wondering if there is some bad state left over. 

Finally looks like you are running 0.10.0, could you try running trunk to see 
if the problem still exists? I'm running trunk. I know there were a couple of 
RocksDb fixes after 0.10.0

Thanks
Eno

> On 11 Oct 2016, at 16:41, Pierre Coquentin  wrote:
> 
> Hi,
> 
> I have a simple test where I create a topology builder with one topic, one
> processor using a persistent store, then I create a kafka streams, start
> it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
> flushing the data. Anyone has already met this kind of problem ?
> 
> OS:
> Ubuntu 16.04
> 
> JVM:
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.04.1-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> 
> Kafka:
> kafka_2.11-0.10.0.1
> 
> The java code to reproduce the problem:
> 
> public static void main(String[] args) throws InterruptedException {
>Map configs = new HashMap<>();
>configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
>configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
> 
>TopologyBuilder builder = new TopologyBuilder()
>.addSource("source", "test")
>.addProcessor("processor", () -> new Processor String>() {
> 
>private KeyValueStore kvStore;
> 
>public void init(ProcessorContext context) {
>kvStore = (KeyValueStore)
> context.getStateStore("store");
>}
> 
>public void process(String key, String value) {
> 
>}
> 
>public void close() {
>kvStore.close();
>}
> 
>@Override
>public void punctuate(long timestamp) {
> 
> 
>}
>}, "source")
>  .addStateStore(Stores.create("store").withKeys(new
> StringSerde()).withValues(new StringSerde()).persistent().build(),
> "processor");
> 
> 
>KafkaStreams streams = new KafkaStreams(builder, new
> StreamsConfig(configs));
>streams.start();
>TimeUnit.SECONDS.sleep(20);
>streams.close();
>}
> 
> 
> There is the log:
> 
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> replication.factor = 1
> num.standby.replicas = 0
> metric.reporters = []
> commit.interval.ms = 3
> bootstrap.servers = [localhost:9092]
> state.dir = /tmp/kafka-streams
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> state.cleanup.delay.ms = 6
> poll.ms = 100
> zookeeper.connect = localhost:2181
> key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> metrics.sample.window.ms = 3
> buffered.records.per.partition = 1000
> value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> timestamp.extractor = class
> org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
> num.stream.threads = 1
> metrics.num.samples = 2
> application.id = test
> client.id =
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.processor.internals.StreamThread:170] Creating
> producer client for stream thread [StreamThread-1]
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
> values:
> metric.reporters = []
> metadata.max.age.ms = 30
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [localhost:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 6
> interceptor.classes = null
> ssl.truststore.password = null
> client.id = test-1-StreamThread-1-producer
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 3
> acks = 1
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 0
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 6
> connec

Re: JVM crash when closing persistent store (rocksDB)

2016-10-11 Thread Pierre Coquentin
Hi,

I already tried to store rocks db files somewhere else by specifying the
kafa state dir properties, but no luck, same behavior.
I will try to run with the trunk tomorrow to see if it's stop correctly,
and I will keep you inform. There must be something with my configuration,
because I googled and saw nothing about this problem.

On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
wrote:

> Hi Pierre,
>
> I tried the exact code on MacOs and am not getting any errors. Could you
> check if all the directories in /tmp where Kafka Streams writes the RocksDb
> files are empty? I'm wondering if there is some bad state left over.
>
> Finally looks like you are running 0.10.0, could you try running trunk to
> see if the problem still exists? I'm running trunk. I know there were a
> couple of RocksDb fixes after 0.10.0
>
> Thanks
> Eno
>
> > On 11 Oct 2016, at 16:41, Pierre Coquentin 
> wrote:
> >
> > Hi,
> >
> > I have a simple test where I create a topology builder with one topic,
> one
> > processor using a persistent store, then I create a kafka streams, start
> > it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
> > flushing the data. Anyone has already met this kind of problem ?
> >
> > OS:
> > Ubuntu 16.04
> >
> > JVM:
> > openjdk version "1.8.0_91"
> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
> 04.1-b14)
> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> >
> > Kafka:
> > kafka_2.11-0.10.0.1
> >
> > The java code to reproduce the problem:
> >
> > public static void main(String[] args) throws InterruptedException {
> >Map configs = new HashMap<>();
> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");
> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > StringSerde.class.getName());
> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > StringSerde.class.getName());
> >
> >TopologyBuilder builder = new TopologyBuilder()
> >.addSource("source", "test")
> >.addProcessor("processor", () -> new Processor > String>() {
> >
> >private KeyValueStore kvStore;
> >
> >public void init(ProcessorContext context) {
> >kvStore = (KeyValueStore)
> > context.getStateStore("store");
> >}
> >
> >public void process(String key, String value) {
> >
> >}
> >
> >public void close() {
> >kvStore.close();
> >}
> >
> >@Override
> >public void punctuate(long timestamp) {
> >
> >
> >}
> >}, "source")
> >  .addStateStore(Stores.create("store").withKeys(new
> > StringSerde()).withValues(new StringSerde()).persistent().build(),
> > "processor");
> >
> >
> >KafkaStreams streams = new KafkaStreams(builder, new
> > StreamsConfig(configs));
> >streams.start();
> >TimeUnit.SECONDS.sleep(20);
> >streams.close();
> >}
> >
> >
> > There is the log:
> >
> >
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> > replication.factor = 1
> > num.standby.replicas = 0
> > metric.reporters = []
> > commit.interval.ms = 3
> > bootstrap.servers = [localhost:9092]
> > state.dir = /tmp/kafka-streams
> > partition.grouper = class
> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > state.cleanup.delay.ms = 6
> > poll.ms = 100
> > zookeeper.connect = localhost:2181
> > key.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> > metrics.sample.window.ms = 3
> > buffered.records.per.partition = 1000
> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> > timestamp.extractor = class
> > org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
> > num.stream.threads = 1
> > metrics.num.samples = 2
> > application.id = test
> > client.id =
> >
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.streams.processor.internals.StreamThread:170] Creating
> > producer client for stream thread [StreamThread-1]
> > 11.10.2016 17:27:11 [main] INFO
> > [org.apache.kafka.clients.producer.ProducerConfig:178] ProducerConfig
> > values:
> > metric.reporters = []
> > metadata.max.age.ms = 30
> > reconnect.backoff.ms = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > bootstrap.servers = [localhost:9092]
> > ssl.keystore.type = JKS
> > sasl.mechanism = GSSAPI
> > max.block.ms = 6
> > interceptor.classes = null
> > ssl.truststore.password = null
> > client.id = test-1-StreamThread-1-producer
> > ssl.endpoint.identification.algorithm = null
> > request.timeout.ms = 3
> > acks = 1
> > receive.buffer.

KTable and KStream should share an interface

2016-10-11 Thread Ali Akhtar
They both have a lot of the same methods, and yet they can't be used
polymorphically because they don't share the same parent interface.

I think KIterable or something like that should be used as their base
interface w/ shared methods.


Safely deleting all existing topics?

2016-10-11 Thread Ali Akhtar
In development, I often need to delete all existing data in all topics, and
start over.

My process for this currently is: stop zookeeper, stop kafka broker, rm -rf
~/kafka/data/*

But when I bring the broker back on, it often prints a bunch of errors and
needs to be restarted before it actually works.

I'm assuming some info is being persisted on Zookeeper.

Any ideas which other data I need to delete to start over w/ a clean slate?

Thanks.


Re: Understanding org.apache.kafka.streams.errors.TopologyBuilderException

2016-10-11 Thread Guozhang Wang
Sachin,

Just a side note in addition to what Matthias mentioned: in the coming
0.10.1.0 release Kafka Streams has added the feature to do
auto-repartitioning by detecting if the message key are joinable or not. To
give a few examples:

---

stream1 = builder.stream("topic1");
stream2 = builder.stream("topic2");

stream1.join(stream2);  // Streams will check if
topic1 and topic2 are co-partitioned (i.e. having same number of
partitions, if not error out)


---

stream1 = builder.stream("topic1");
stream2 = builder.stream("topic2").selectKey(...);

stream1.join(stream2);  // Streams will assume that
stream2 is not co-partitioned with stream1 since its key has changed, and
hence auto re-partition stream2, in other words, it is re-written behind
the scene as:


stream1 = builder.stream("topic1");
stream2 =
builder.stream("topic2").selectKey(...).through("repartition-topic");  //
"repartition-topic" is created on-the-fly with the same number of
partitions as topic1.

stream1.join(stream2);



Stay tuned on the release docs for such new features in the upgrade / API
changes section.

Guozhang

On Sun, Oct 9, 2016 at 12:20 AM, Sachin Mittal  wrote:

> Thanks for pointing this out.
> I am doing exactly like this now and it is working fine.
>
> Sachin
>
>
> On Sun, Oct 9, 2016 at 12:32 PM, Matthias J. Sax 
> wrote:
>
> > You must ensure, that both streams are co-partitioned (ie, same number
> > of partitions and using the join key).
> >
> > (see "Note" box:
> > http://docs.confluent.io/3.0.1/streams/developer-guide.
> > html#joining-streams)
> >
> > You can enforce co-partitioning by introducing a call to .through()
> > before doing the join (on either one or both input streams). You need to
> > insert .through() for an input stream, if you did (potentially) modify
> > the key (eg, you did apply .selectKey(), map(), or flatMap() before the
> > join).
> >
> > If one stream's key is not modified, it is sufficient to only
> > re-distribute the other stream via .through(). Also keep in mind, that
> > you should create the topic use in .through() manually with the right
> > number of partitions before you start you application.
> >
> >
> > -Matthias
> >
> > On 10/08/2016 08:54 AM, Sachin Mittal wrote:
> > > I don't think that is the issue.
> > > The join api says:
> > > public  KTable join(KTable other, ValueJoiner V1,
> > R>
> > > joiner)
> > > In my case V is Map>
> > > V1 is List
> > > R is Map>
> > > K is String
> > >
> > > Note the final V and V1 are arrived after doing transformation on
> > original
> > > streams
> > > 
> > >
> > > So there are intermediate steps like
> > > stream.map(new KeyValueMapper>())
> > > and
> > > table.mapValues(new ValueMapper>()
> > >
> > > So whenever I modify the structure of a stream or table do I need to
> back
> > > it up with a new kafka topic calling through("new-mapped-topic") ?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Sat, Oct 8, 2016 at 7:29 PM, Martin Gainty 
> > wrote:
> > >
> > >>
> > >>
> > >>
> > >>> From: sjmit...@gmail.com
> > >>> Date: Sat, 8 Oct 2016 15:30:33 +0530
> > >>> Subject: Understanding org.apache.kafka.streams.errors.
> > >> TopologyBuilderException
> > >>> To: users@kafka.apache.org
> > >>>
> > >>> Hi,
> > >>> I am getting this exception
> > >>>
> > >>> Exception in thread "main"
> > >>> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> > >> topology
> > >>> building: KTABLE-MAPVALUES-07 and
> KSTREAM-AGGREGATE-09
> > >> are
> > >>> not joinable
> > >>>
> > >> MG>look at join declaration for org.apache.kafka.streams.
> > >> kstream.internals.KTableImpl.java
> > >> MG> public  KTable join(KTable other,
> ValueJoiner > >> V1, R> joiner)
> > >> MG>method join assumes 2 collections that exactly match the generic
> > >> declaration of join method
> > >>
> > >> MG>KTable>> !=  KTable>
> > (2nd
> > >> parameter is missing both V and R declarators)
> > >> MG>you can establish a new collection of KTable>
> > >>
> > >> MG>and then *join* KTable>>  into
> > >> KTable>  thru custom join method
> > >>
> > >>> What I am trying to do is I aggregate a KStream into a KTable of type
> > >>> KTable>>
> > >>>
> > >>> and I am trying to join it to another KStream which is aggregated
> into
> > >>> another KTable of type
> > >>>  KTable>
> > >>>
> > >>> Since keys of both the final KTable are same, I don't understand why
> it
> > >> is
> > >>> giving this exception.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang


Re: Re: I found kafka lsot message

2016-10-11 Thread Guozhang Wang
One common issue of lost messages is that consumer auto-committing (related
config is "auto.commit.enabled", "commit.interval.ms"): from the Kafka
consumer point of view, once the messages are returned from the "poll" call
they are considered "consumed", and if committing offsets is called it will
set the offset to the last message's offset. So if you have some exceptions
/ errors in the middle of processing some of your consumed records, upon
restarting they will not be re-fetched again, hence possibly causing "data
loss" on your end.


Guozhang

On Mon, Oct 10, 2016 at 3:33 AM,  wrote:

> Hi Guozhang,
> At first, thank you answer my question, and give me some suggest.
> But, I'm sure I readed some introduction about kafka.
>
> In my producer, My Code is( c code):
> res = rd_kafka_conf_set(kafka_conf, "request.required.acks", "-1", NULL,
> 0);
> res = rd_kafka_topic_conf_set( topic_conf, "produce.offset.report",
> "true", errstr, sizeof(errstr) );
>
> In my consumer, My Code is(kafka-python):
>
> self.__consumer = KafkaConsumer( bootstrap_servers=self.__
> kafka_config["hosts"],
> group_id=self.__kafka_config["
> group"],
> auto_offset_reset="earliest",
> )
> self.__consumer.subscribe([self.__kafka_config["data_
> topic"]])
>
> for message in self.__consumer:
>
>
> I think these codes is common, What's your suggest about these codes?
>
> In the end, I must explain: Lost message is not often, some time, couple
> days can find one or two lost messages.
> But some day, maybe can find over 20 messages were losted.
> Our message over 1,200,000 in one day.
>
> So, do your have any suggest?
> Bye the way, can you speak Chinese?
> Thank you very much & Best wishes
> Jerry
>
>
> 
>
>
> - 原始邮件 -
> 发件人:Guozhang Wang 
> 收件人:"users@kafka.apache.org" , yangy...@sina.com
> 主题:Re: I found kafka lsot message
> 日期:2016年10月10日 01点25分
>
> Jerry,
>
> Message lost scenarios usually are due to producer and consumer
> mis-configured. Have you read about the client configs web docs?
>
> http://kafka.apache.org/documentation#producerconfigs
>
> http://kafka.apache.org/documentation#newconsumerconfigs
>
>
> If not I'd suggest you reading those first and see if you can tune some of
> these configs to have better delivery guarantees.
>
> Guozhang
>
>
> On Fri, Oct 7, 2016 at 9:48 PM,  wrote:
>
> Hello every body,I build a kafka cluster(include 5 domains) use
> kafka_2.11-0.10.0.0 and kafka-python-1.3.1.I create a topic by 100
> partitions and 2 replicate, then use 20 consumers to receive message.
> But, I found sometime the kafka lost message! I found some partition's
> offsite lost at consumer.After, I make a global index for every message
> from producer for confirm this problem, and I also found the global index
> had been break!
> Why the kafka lost message?  What can I do to fix the problem?
> Thanks!Jerry
>
> 
>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: Frequent Consumer Rebalance/ Commit fail exception

2016-10-11 Thread Guozhang Wang
Hello Rahul,

This "CommitFailedException" usually means that the consumer group
coordinator that sits on the server side has decided that this consumer is
"failed" from the heartbeat protocol and hence kicked out of the group, and
later when it sees a commit-offset request from this consumer it will just
reject the request with "rebalance in progress" error code.

Since you mentioned that you still periodically call poll just toe send the
heartbeat I'd suspect this is because the heartbeat is not somehow sent in
time. Consider either increase the polling frequency while pausing the
topics, or set the "session.timeout.ms" value larger so that it is less
sensitive to heartbeat delays.


BTW, in the up-coming 0.10.1.0 release a new feature (KIP-62:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread)
is added to the consumer, where session timeout is separate from a new
config named "processing.timeout.ms" so that you can set the latter config
to be much higher (default value is like 5 minutes) so that the
consume-then-process pattern can be more conveniently supported than
pause-and-poll pattern. Hope it helps.


Guozhang


On Mon, Oct 10, 2016 at 12:13 PM, Misra, Rahul 
wrote:

> Hi,
>
> I have a custom Kafka consumer which reads messages from a topic, hands
> over the processing of the messages  to a different thread, and while the
> messages are being processed, it pauses the topic and keeps polling the
> Kafka topic (to maintain heartbeats) and also commits offsets using
> commitSync() once the processing thread returns success.
> This consumer is the only consumer in its group. Auto commit for offsets
> is set to false.
>
> The consumer also registers the onPartitionsAssigned() and
> onPartitionsRevoked() listeners.
>
> Recently I observed that the consumer frequently crashes (if consuming
> large number of messages) with the following exception:
>
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be completed due to group rebalance at org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:546)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:487)
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:681)
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:654)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture.complete(RequestFuture.java:107)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(
> ConsumerNetworkClient.java:350)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:288)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
>
> None of the rebalance listeners were called before this exception.
> Could somebody suggest why this rebalancing is being triggered, always
> while committing the offsets (or is the actual issue somewhere else?)
>
> Regards,
> Rahul Misra
>
> This email message and any attachments are intended solely for the use of
> the addressee. If you are not the intended recipient, you are prohibited
> from reading, disclosing, reproducing, distributing, disseminating or
> otherwise using this transmission. If you have received this message in
> error, please promptly notify the sender by reply email and immediately
> delete this message from your system. This message and any attachments may
> contain information that is confidential, privileged or exempt from
> disclosure. Delivery of this message to any person other than the intended
> recipient is not intended to waive any right or privilege. Message
> transmission is not guaranteed to be secure or free of software viruses.
> 
> ***
>



-- 
-- Guozhang


Re: How to use RPC mechanism in Kafka?

2016-10-11 Thread Fernando Oliveira
You could do every producer consume on a specific partition of response topic 
(no high level consume) and send that number on the message... when the 
consumer process the data, it sends the answer to the specific partition

Enviado do meu iPhone

Re: Safely deleting all existing topics?

2016-10-11 Thread Guozhang Wang
Ali,

When I did testing / development, I usually delete zk directory as well
(default is /tmp/zookeeper) for clean up.


Guozhang


On Tue, Oct 11, 2016 at 3:33 PM, Ali Akhtar  wrote:

> In development, I often need to delete all existing data in all topics, and
> start over.
>
> My process for this currently is: stop zookeeper, stop kafka broker, rm -rf
> ~/kafka/data/*
>
> But when I bring the broker back on, it often prints a bunch of errors and
> needs to be restarted before it actually works.
>
> I'm assuming some info is being persisted on Zookeeper.
>
> Any ideas which other data I need to delete to start over w/ a clean slate?
>
> Thanks.
>



-- 
-- Guozhang


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
HI Michael;

Really appreciate for the clear explanation..
I modified my code as you mentioned. I have written custom, Serde,
serializer,deserializer.
But now the problem i see is, both topics are not merged. Means, Messages
in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
messages)
I do not see any exceptions.

Here is my custom serde, serializer/deserializer and the logic; Also I have
properties file where i defined  following parameters;

*bootstrap.servers=xx.com \:9092,xx.com
\:9092,xx.com \:9092*

*key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde*

*value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*

*application.id =stream-pipe*


Do you see any issue here? Why messages are not written to ' targetTopic'?



*LOGIC*

/**

* create stream from source topics and write it to the target topic

* @param sourceTopics

* @param targetTopic

*/

public void write(String[] sourceTopics, String targetTopic) {

 KafkaStreams streams = null;

 KStreamBuilder builder = new KStreamBuilder();

  try {

   KStream kafkaPayloadStream = builder
.stream(stringSerde, kafkaPayloadSerde, sourceTopics);

   kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic
);

   streams = new KafkaStreams(builder, properties);

   streams.start();

   Thread.sleep(5000L);

  } catch (InterruptedException e) {

  log.warn(e);

 } catch (Exception e) {

 log.error("Topic merge failed. ",e);

  } finally {

   if (streams != null) {

   streams.close();

 }

}

}




*SERDE*


public class KafkaPayloadSerdes {

static private class WrapperSerde implements
Serde {
final private Serializer serializer;
final private Deserializer deserializer;

public WrapperSerde(Serializer serializer,
Deserializer deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}

@Override
public void configure(Map configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}

@Override
public void close() {
serializer.close();
deserializer.close();
}

@Override
public Serializer serializer() {
return serializer;
}

@Override
public Deserializer deserializer() {
return deserializer;
}
}

static public final class KafkaPayloadSerde extends
WrapperSerde {
public KafkaPayloadSerde() {
super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
}
}

/**
* A serde for nullable < KafkaPayload> type.
*/
static public Serde KafkaPayload() {
return new KafkaPayloadSerde();
}

}


*Serilizer/Deserializer*



public class KafkaPayloadSerializer implements Serializer,
Deserializer {

private static final Logger log = org.apache.logging.log4j.LogManager
.getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());

@Override
public KafkaPayload deserialize(String topic, byte[] arg1) {
ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
ObjectInput in = null;
Object obj = null;
try {
in = new ObjectInputStream(bis);
obj = in.readObject();
} catch (IOException e) {
log.error(e);
} catch (ClassNotFoundException e) {
log.error(e);
} finally {
try {
bis.close();
if (in != null) {
in.close();
}
} catch (IOException ex) {
log.error(ex);
}
}
return (KafkaPayload) obj;
}

@Override
public void close() {
// TODO Auto-generated method stub

}

@Override
public byte[] serialize(String topic, KafkaPayload kpayload) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] payload = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(kpayload);
payload = bos.toByteArray();

} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
bos.close();
}
} catch (Exception ex) {
log.error(ex);
}
}
return payload;
}

@Override
public void configure(Map configs, boolean isKey) {
// TODO Auto-generated method stub

}

}



On 11 October 2016 at 20:13, Michael Noll  wrote:

> When I wrote:
>
> "If you haven't changed to default key and value serdes, then `to()`
> will fail because [...]"
>
> it should have read:
>
> "If you haven't changed the default key and value serdes, then `to()`
> will fail because [...]"
>
>
>
> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll 
> wrote:
>
> > Ratha,
> >
> > if you based your problematic code on the PipeDemo example, then you
> > should have these two lines in your code (which most probably you haven't
> > changed):
> >
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >
> > This configures your application to interpret (= encode/decode), by
> > default, the keys and values of any messages it reads from Kafka as
> > strings.  This works for the PipeDemo example because the keys and values
> > are actually strings.
> >
> > In your

RE: Frequent Consumer Rebalance/ Commit fail exception

2016-10-11 Thread Misra, Rahul
Thanks Guozhang for the detailed explanation.
It was really helpful.

Regards,
Rahul Misra


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Wednesday, October 12, 2016 6:25 AM
To: users@kafka.apache.org
Subject: Re: Frequent Consumer Rebalance/ Commit fail exception

Hello Rahul,

This "CommitFailedException" usually means that the consumer group coordinator 
that sits on the server side has decided that this consumer is "failed" from 
the heartbeat protocol and hence kicked out of the group, and later when it 
sees a commit-offset request from this consumer it will just reject the request 
with "rebalance in progress" error code.

Since you mentioned that you still periodically call poll just toe send the 
heartbeat I'd suspect this is because the heartbeat is not somehow sent in 
time. Consider either increase the polling frequency while pausing the topics, 
or set the "session.timeout.ms" value larger so that it is less sensitive to 
heartbeat delays.


BTW, in the up-coming 0.10.1.0 release a new feature (KIP-62:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread)
is added to the consumer, where session timeout is separate from a new config 
named "processing.timeout.ms" so that you can set the latter config to be much 
higher (default value is like 5 minutes) so that the consume-then-process 
pattern can be more conveniently supported than pause-and-poll pattern. Hope it 
helps.


Guozhang


On Mon, Oct 10, 2016 at 12:13 PM, Misra, Rahul 
wrote:

> Hi,
>
> I have a custom Kafka consumer which reads messages from a topic, 
> hands over the processing of the messages  to a different thread, and 
> while the messages are being processed, it pauses the topic and keeps 
> polling the Kafka topic (to maintain heartbeats) and also commits 
> offsets using
> commitSync() once the processing thread returns success.
> This consumer is the only consumer in its group. Auto commit for 
> offsets is set to false.
>
> The consumer also registers the onPartitionsAssigned() and
> onPartitionsRevoked() listeners.
>
> Recently I observed that the consumer frequently crashes (if consuming 
> large number of messages) with the following exception:
>
>
> org.apache.kafka.clients.consumer.CommitFailedException: 
> Commit cannot be completed due to group rebalance at org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:546)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator$OffsetCommitResponseHandler.
> handle(ConsumerCoordinator.java:487)
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:681)
> at org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator$CoordinatorResponseHandler.
> onSuccess(AbstractCoordinator.java:654)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> at org.apache.kafka.clients.consumer.internals.
> RequestFuture.complete(RequestFuture.java:107)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(
> ConsumerNetworkClient.java:350)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:288)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
> at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
>
> None of the rebalance listeners were called before this exception.
> Could somebody suggest why this rebalancing is being triggered, always 
> while committing the offsets (or is the actual issue somewhere else?)
>
> Regards,
> Rahul Misra
>
> This email message and any attachments are intended solely for the use 
> of the addressee. If you are not the intended recipient, you are 
> prohibited from reading, disclosing, reproducing, distributing, 
> disseminating or otherwise using this transmission. If you have 
> received this message in error, please promptly notify the sender by 
> reply email and immediately delete this message from your system. This 
> message and any attachments may contain information that is 
> confidential, privileged or exempt from disclosure. Delivery of this 
> message to any person other than the intended recipient is not 
> intended to waive any right or privilege. Message transmission is not 
> guaranteed to be secure or free of software viruses.
> 
> *

Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
HI Michael;
Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
in my 'targetTopic'.
But still I get my class cast exception issue, when I consume message from
the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)

*ConsumerRecords records = consumer.poll(Long.MAX_VALUE);*



*Exception*

*java.lang.ClassCastException: java.lang.String cannot be cast to
xxx.core.kafkamodels.KafkaPayload at
xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108)
~[classes/:?]*

at
xx.core.listener.KafkaMessageListenerThread.process(KafkaMessageListenerThread.java:68)
~[classes/:?]

at
xx.core.listener.KafkaMessageListenerThread.lambda$run$1(KafkaMessageListenerThread.java:50)
~[classes/:?]

at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]

at
com.leightonobrien.core.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:50)
[classes/:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_66]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_66]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_66]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]



On 12 October 2016 at 13:19, Ratha v  wrote:

> HI Michael;
>
> Really appreciate for the clear explanation..
> I modified my code as you mentioned. I have written custom, Serde,
> serializer,deserializer.
> But now the problem i see is, both topics are not merged. Means, Messages
> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
> messages)
> I do not see any exceptions.
>
> Here is my custom serde, serializer/deserializer and the logic; Also I
> have properties file where i defined  following parameters;
>
> *bootstrap.servers=xx.com \:9092,xx.com
> \:9092,xx.com \:9092*
>
> *key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde*
>
> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
>
> *application.id =stream-pipe*
>
>
> Do you see any issue here? Why messages are not written to ' targetTopic'?
>
>
>
> *LOGIC*
>
> /**
>
> * create stream from source topics and write it to the target topic
>
> * @param sourceTopics
>
> * @param targetTopic
>
> */
>
> public void write(String[] sourceTopics, String targetTopic) {
>
>  KafkaStreams streams = null;
>
>  KStreamBuilder builder = new KStreamBuilder();
>
>   try {
>
>KStream kafkaPayloadStream = builder
> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
>kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> targetTopic);
>
>streams = new KafkaStreams(builder, properties);
>
>streams.start();
>
>Thread.sleep(5000L);
>
>   } catch (InterruptedException e) {
>
>   log.warn(e);
>
>  } catch (Exception e) {
>
>  log.error("Topic merge failed. ",e);
>
>   } finally {
>
>if (streams != null) {
>
>streams.close();
>
>  }
>
> }
>
> }
>
>
>
>
> *SERDE*
>
>
> public class KafkaPayloadSerdes {
>
> static private class WrapperSerde implements
> Serde {
> final private Serializer serializer;
> final private Deserializer deserializer;
>
> public WrapperSerde(Serializer serializer,
> Deserializer deserializer) {
> this.serializer = serializer;
> this.deserializer = deserializer;
> }
>
> @Override
> public void configure(Map configs, boolean isKey) {
> serializer.configure(configs, isKey);
> deserializer.configure(configs, isKey);
> }
>
> @Override
> public void close() {
> serializer.close();
> deserializer.close();
> }
>
> @Override
> public Serializer serializer() {
> return serializer;
> }
>
> @Override
> public Deserializer deserializer() {
> return deserializer;
> }
> }
>
> static public final class KafkaPayloadSerde extends
> WrapperSerde {
> public KafkaPayloadSerde() {
> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> }
> }
>
> /**
> * A serde for nullable < KafkaPayload> type.
> */
> static public Serde KafkaPayload() {
> return new KafkaPayloadSerde();
> }
>
> }
>
>
> *Serilizer/Deserializer*
>
>
>
> public class KafkaPayloadSerializer implements Serializer,
> Deserializer {
>
> private static final Logger log = org.apache.logging.log4j.LogManager
> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
>
> @Override
> public KafkaPayload deserialize(String topic, byte[] arg1) {
> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
> ObjectInput in = null;
> Object obj = null;
> try {
> in = new ObjectInputStream(bis);
> obj = in.readObject();
> } catch (IOException e) {
> log.error(e);
> } catch (ClassNotFoundException e) {
> log.error(e);
> } finally {
> try {
> bis.close();
> if (in != null) {
> in.close();
> }
> } catch (IOException ex) {
> log.error

Understanding out of order message processing w/ Streaming

2016-10-11 Thread Ali Akhtar
Heya,

Say I'm building a live auction site, with different products. Different
users will bid on different products. And each time they do, I want to
update the product's price, so it should always have the latest price in
place.

Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
product 100 ms later.

The second bid arrives first and the price is updated to $5. Then the first
bid arrives. I want the price to not be updated in this case, as this bid
is older than the one I've already processed.

Here's my understanding of how I can achieve this with Kafka Streaming - is
my understanding correct?

- I have a topic for receiving bids. The topic has N partitions, and I have
N replicas of my application which hooks up w/ Kafka Streaming, up and
running.

- I assume each replica of my app will listen to a different partition of
the topic.

- A user makes a bid on product A.

- This is pushed to the topic with the key bid_a

- Another user makes a bid. This is also pushed with the same key (bid_a)

- The 2nd bid arrives first, and gets processed. Then the first (older) bid
arrives.

- Because I'm using a KTable, the timestamp of the messages is extracted,
and I'm not shown the older bid because I've already processed the later
bid. The older bid is ignored.

- All bids on product A go to the same topic partition, and hence the same
replica of my app, because they all have the key bid_a.

- Because of this, the replica already knows which timestamps it has
processed, and is able to ignore the older messages.

Is the above understandning correct?

Also, what will happen if bid 2 arrived and got processed, and then the
particular replica crashed, and was restarted. The restarted replica won't
have any memory of which timestamps it has previously processed.

So if bid 2 got processed, replica crashed and restarted, and then bid 1
arrived, what would happen in that case?

Thanks.


Re: In Kafka Streaming, Serdes should use Optionals

2016-10-11 Thread Ali Akhtar
Thanks. That filter() method is a good solution. But whenever I look at it,
I feel an empty spot in my heart which can only be filled by:
filter(Optional::isPresent)

On Wed, Oct 12, 2016 at 12:15 AM, Guozhang Wang  wrote:

> Ali,
>
> We are working on moving from Java7 to Java8 in Apache Kafka, and the
> Streams client is one of the motivations doing so. Stay tuned on the
> mailing list when it will come.
>
> Currently Streams won't automatically filter out null values for you since
> in some other cases they may have semantic meanings and cannot be simply
> ignored; you can, though, apply a simple filter such like "filter((key,
> value) => value != null)" before your processor lambda operator, if it
> looks clearer in your code.
>
> Guozhang
>
>
> On Sun, Oct 9, 2016 at 3:14 PM, Ali Akhtar  wrote:
>
> > It isn't a fatal error. It should be logged as a warning, and then the
> > stream should be continued w/ the next message.
> >
> > Checking for null is 'ok', in the sense that it gets the job done, but
> > after java 8's release, we really should be using optionals.
> >
> > Hopefully we can break compatibility w/ the bad old days soon and move
> into
> > the future.
> >
> > (If there's a way to do the null check automatically, i.e before calling
> > the lambda, please let me know).
> >
> > On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang 
> wrote:
> >
> > > Ali,
> > >
> > > In your scenario, if serde fails to parse the bytes should that be
> > treated
> > > as a fatal failure or it is expected?
> > >
> > > In the former case, instead of returning a null I think it is better to
> > > throw a runtime exception in order to let the whole client to stop and
> > > notify the error; in the latter case, returning and checking null looks
> > > fine to me.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar 
> wrote:
> > >
> > > > Hey G,
> > > >
> > > > Looks like the only difference is a valueSerde parameter.
> > > >
> > > > How does that prevent having to look for nulls in the consumer?
> > > >
> > > > E.g, I wrote a custom Serde which converts the messages (which are
> json
> > > > strings) into a Java class using Jackson.
> > > >
> > > > If the json parse fails, it sends back a null.
> > > >
> > > > When I'm reading this stream, in my callback, how would I prevent
> > having
> > > to
> > > > check if the serialized value isn't null?
> > > >
> > > > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Ali,
> > > > >
> > > > > We do have corresponding overloaded functions for most of KStream /
> > > > KTable
> > > > > operators to avoid enforcing users to specify "null"; in these
> cases
> > > the
> > > > > default serdes specified in the configs are then used. For example:
> > > > >
> > > > >  KTable aggregate(Initializer initializer,
> > > > >Aggregator adder,
> > > > >Aggregator subtractor,
> > > > >Serde aggValueSerde,
> > > > >String storeName);
> > > > >
> > > > > /**
> > > > >  * .. using default serializers and deserializers.
> > > > >  */
> > > > >  KTable aggregate(Initializer initializer,
> > > > >Aggregator adder,
> > > > >Aggregator subtractor,
> > > > >String storeName);
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll  >
> > > > wrote:
> > > > >
> > > > > > Ali, the Apache Kafka project still targets Java 7, which means
> we
> > > > can't
> > > > > > use Java 8 features just yet.
> > > > > >
> > > > > > FYI: There's on ongoing conversation about when Kafka would move
> > from
> > > > > Java
> > > > > > 7 to Java 8.
> > > > > >
> > > > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar  >
> > > > wrote:
> > > > > >
> > > > > > > Since we're using Java 8 in most cases anyway, Serdes /
> > > Serialiazers
> > > > > > should
> > > > > > > use options, to avoid having to deal with the lovely nulls.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer'
property..
Now things are working fine..
Thanks a lot.

On 12 October 2016 at 14:10, Ratha v  wrote:

> HI Michael;
> Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
> in my 'targetTopic'.
> But still I get my class cast exception issue, when I consume message from
> the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
>
> *ConsumerRecords records = consumer.poll(Long.MAX_VALUE);*
>
>
>
> *Exception*
>
> *java.lang.ClassCastException: java.lang.String cannot be cast to
> xxx.core.kafkamodels.KafkaPayload at
> xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108)
> ~[classes/:?]*
>
> at xx.core.listener.KafkaMessageListenerThread.process(
> KafkaMessageListenerThread.java:68) ~[classes/:?]
>
> at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(
> KafkaMessageListenerThread.java:50) ~[classes/:?]
>
> at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]
>
> at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(
> KafkaMessageListenerThread.java:50) [classes/:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_66]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_66]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
>
>
>
> On 12 October 2016 at 13:19, Ratha v  wrote:
>
>> HI Michael;
>>
>> Really appreciate for the clear explanation..
>> I modified my code as you mentioned. I have written custom, Serde,
>> serializer,deserializer.
>> But now the problem i see is, both topics are not merged. Means, Messages
>> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
>> messages)
>> I do not see any exceptions.
>>
>> Here is my custom serde, serializer/deserializer and the logic; Also I
>> have properties file where i defined  following parameters;
>>
>> *bootstrap.servers=xx.com \:9092,xx.com
>> \:9092,xx.com \:9092*
>>
>> *key.serde=org.apache.kafka.com
>> mon.serialization.Serdes$StringSerde*
>>
>> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
>>
>> *application.id =stream-pipe*
>>
>>
>> Do you see any issue here? Why messages are not written to ' targetTopic'?
>>
>>
>>
>> *LOGIC*
>>
>> /**
>>
>> * create stream from source topics and write it to the target topic
>>
>> * @param sourceTopics
>>
>> * @param targetTopic
>>
>> */
>>
>> public void write(String[] sourceTopics, String targetTopic) {
>>
>>  KafkaStreams streams = null;
>>
>>  KStreamBuilder builder = new KStreamBuilder();
>>
>>   try {
>>
>>KStream kafkaPayloadStream = builder
>> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>>
>>kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
>> targetTopic);
>>
>>streams = new KafkaStreams(builder, properties);
>>
>>streams.start();
>>
>>Thread.sleep(5000L);
>>
>>   } catch (InterruptedException e) {
>>
>>   log.warn(e);
>>
>>  } catch (Exception e) {
>>
>>  log.error("Topic merge failed. ",e);
>>
>>   } finally {
>>
>>if (streams != null) {
>>
>>streams.close();
>>
>>  }
>>
>> }
>>
>> }
>>
>>
>>
>>
>> *SERDE*
>>
>>
>> public class KafkaPayloadSerdes {
>>
>> static private class WrapperSerde implements
>> Serde {
>> final private Serializer serializer;
>> final private Deserializer deserializer;
>>
>> public WrapperSerde(Serializer serializer,
>> Deserializer deserializer) {
>> this.serializer = serializer;
>> this.deserializer = deserializer;
>> }
>>
>> @Override
>> public void configure(Map configs, boolean isKey) {
>> serializer.configure(configs, isKey);
>> deserializer.configure(configs, isKey);
>> }
>>
>> @Override
>> public void close() {
>> serializer.close();
>> deserializer.close();
>> }
>>
>> @Override
>> public Serializer serializer() {
>> return serializer;
>> }
>>
>> @Override
>> public Deserializer deserializer() {
>> return deserializer;
>> }
>> }
>>
>> static public final class KafkaPayloadSerde extends
>> WrapperSerde {
>> public KafkaPayloadSerde() {
>> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
>> }
>> }
>>
>> /**
>> * A serde for nullable < KafkaPayload> type.
>> */
>> static public Serde KafkaPayload() {
>> return new KafkaPayloadSerde();
>> }
>>
>> }
>>
>>
>> *Serilizer/Deserializer*
>>
>>
>>
>> public class KafkaPayloadSerializer implements Serializer,
>> Deserializer {
>>
>> private static final Logger log = org.apache.logging.log4j.LogManager
>> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
>>
>> @Override
>> public Kafka

Mirror Maker - Message Format Issue?

2016-10-11 Thread Craig Swift
Hello,

I think we're misunderstanding the docs on some level and I need a little
clarification. We have the following setup:

1) 0.8.2 producer -> writing to Kafka 0.10.0.1 cluster w/ version 10
message format (source cluster).
2) 0.10.0.1 mirror using the 'new consumer' reading from the source cluster
and writing to Kafka 0.10.0.1 cluster w/version 0.8.2 message format
(destination cluster). We need some of the features like SSL, hence using
the new consumer.
3) Lots of old 0.8.2 consumers reading from the destination cluster that
still need to be upgraded.

We're seeing errors from the mirror maker when trying to produce to the
destination cluster like the following:

java.lang.IllegalArgumentException: Invalid timestamp -1
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60)

Is the root problem the 0.8.2 producer sending data to the source cluster
or the new 10 mirror writing data to the destination cluster in 0.8.2
format? From the docs we were under the impression that the data would be
stored in the source cluster in 10 format regardless of the producer and
the mirror could produce to the destination cluster regardless of it's
message format setting.

Is this current setup non-functional or is there a way to make this work?
For example, if the mirror producing is the issue could we implement a
custom MirrorMakerMessageHandler? Any advice and clarification would be
helpful, thanks.

Craig


Re: Understanding out of order message processing w/ Streaming

2016-10-11 Thread Ali Akhtar
P.S, does my scenario require using windows, or can it be achieved using
just KTable?

On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar  wrote:

> Heya,
>
> Say I'm building a live auction site, with different products. Different
> users will bid on different products. And each time they do, I want to
> update the product's price, so it should always have the latest price in
> place.
>
> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on the same
> product 100 ms later.
>
> The second bid arrives first and the price is updated to $5. Then the
> first bid arrives. I want the price to not be updated in this case, as this
> bid is older than the one I've already processed.
>
> Here's my understanding of how I can achieve this with Kafka Streaming -
> is my understanding correct?
>
> - I have a topic for receiving bids. The topic has N partitions, and I
> have N replicas of my application which hooks up w/ Kafka Streaming, up and
> running.
>
> - I assume each replica of my app will listen to a different partition of
> the topic.
>
> - A user makes a bid on product A.
>
> - This is pushed to the topic with the key bid_a
>
> - Another user makes a bid. This is also pushed with the same key (bid_a)
>
> - The 2nd bid arrives first, and gets processed. Then the first (older)
> bid arrives.
>
> - Because I'm using a KTable, the timestamp of the messages is extracted,
> and I'm not shown the older bid because I've already processed the later
> bid. The older bid is ignored.
>
> - All bids on product A go to the same topic partition, and hence the same
> replica of my app, because they all have the key bid_a.
>
> - Because of this, the replica already knows which timestamps it has
> processed, and is able to ignore the older messages.
>
> Is the above understandning correct?
>
> Also, what will happen if bid 2 arrived and got processed, and then the
> particular replica crashed, and was restarted. The restarted replica won't
> have any memory of which timestamps it has previously processed.
>
> So if bid 2 got processed, replica crashed and restarted, and then bid 1
> arrived, what would happen in that case?
>
> Thanks.
>


0.8.2.1 Broker ZK connection loss and high level consumer reaction

2016-10-11 Thread John Holland
Recently I had a situation occur where a network partition happened between
one of the nodes in a 3 node cluster and zookeeper.  The broker affected
never reconnected to zookeeper (it's ID was not registered in ZK) and the
metrics indicate that it became another active controller.  It still
considered itself leader over the partitions originally assigned to it and
did not indicate any under replicated partitions.

Producers (new producer) did stop producing to it and switched over to the
new leaders, but I'm not sure if it was due to the minISR setting or due to
a metadata update.

The consumers connected to the bad broker didn't react at all and continued
consuming from it.  Lag alerts went off when the consumers started falling
behind on the partitions the bad broker was originally leader of (because
no new data was being received by that broker).

I'm curious to know if anyone has seen behavior like this in 0.8.2.1 before
and if so, does 0.10 help with it?  Ideally I would want the consumers (new
consumer) to react to the fact that this broker split from the cluster and
was no longer receiving data so they could move over to the new partition
leaders.

I would love to supply logs but due to an infrastructure issue many of them
have been lost.

-John


How can I delete a topic programatically?

2016-10-11 Thread Ratha v
Hi all;

I have two topics(source and target). I do some processing on the message
available in the source topic and i merge both topic.
That is;

builder.stream(sourceTopic).to(targetTopic)

Once merged I no longer require the sourceTopic. I want to delete it.

How can I do that programatically in java? I use highelevel  client APIs,
kafka v 0.10.0.1


Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: How can I delete a topic programatically?

2016-10-11 Thread Ali Akhtar
The last time I tried, I couldn't find a way to do it, other than to
trigger the bash script for topic deletion programatically.

On Wed, Oct 12, 2016 at 9:18 AM, Ratha v  wrote:

> Hi all;
>
> I have two topics(source and target). I do some processing on the message
> available in the source topic and i merge both topic.
> That is;
>
> builder.stream(sourceTopic).to(targetTopic)
>
> Once merged I no longer require the sourceTopic. I want to delete it.
>
> How can I do that programatically in java? I use highelevel  client APIs,
> kafka v 0.10.0.1
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: How can I delete a topic programatically?

2016-10-11 Thread Ratha v
Thanks..Which bash script I need to run?

On 12 October 2016 at 15:20, Ali Akhtar  wrote:

> The last time I tried, I couldn't find a way to do it, other than to
> trigger the bash script for topic deletion programatically.
>
> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v  wrote:
>
> > Hi all;
> >
> > I have two topics(source and target). I do some processing on the message
> > available in the source topic and i merge both topic.
> > That is;
> >
> > builder.stream(sourceTopic).to(targetTopic)
> >
> > Once merged I no longer require the sourceTopic. I want to delete it.
> >
> > How can I do that programatically in java? I use highelevel  client APIs,
> > kafka v 0.10.0.1
> >
> >
> > Thanks
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How can I delete a topic programatically?

2016-10-11 Thread Jianbin Wei
You can check this

http://kafka.apache.org/documentation.html#basic_ops_add_topic

But from our experience it is best to delete topics one by one, i.e., make sure 
Kafka is in good shape before and after deleting a topic before working on next 
one.

Regards,

-- Jianbin

> On Oct 11, 2016, at 9:26 PM, Ratha v  wrote:
> 
> Thanks..Which bash script I need to run?
> 
>> On 12 October 2016 at 15:20, Ali Akhtar  wrote:
>> 
>> The last time I tried, I couldn't find a way to do it, other than to
>> trigger the bash script for topic deletion programatically.
>> 
>>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v  wrote:
>>> 
>>> Hi all;
>>> 
>>> I have two topics(source and target). I do some processing on the message
>>> available in the source topic and i merge both topic.
>>> That is;
>>> 
>>> builder.stream(sourceTopic).to(targetTopic)
>>> 
>>> Once merged I no longer require the sourceTopic. I want to delete it.
>>> 
>>> How can I do that programatically in java? I use highelevel  client APIs,
>>> kafka v 0.10.0.1
>>> 
>>> 
>>> Thanks
>>> --
>>> -Ratha
>>> http://vvratha.blogspot.com/
>>> 
>> 
> 
> 
> 
> -- 
> -Ratha
> http://vvratha.blogspot.com/


Re: Training Kafka and ZooKeeper - Monitoring and Operability

2016-10-11 Thread Ben Stopford
Useful resource Nico, Thanks

B

On Tuesday, 11 October 2016, Nicolas Motte  wrote:

> Hi everyone,
>
> I created a training for Application Management and OPS teams in my
> company.
> Some sections are specific to our deployment, but most of them are generic
> and explain how Kafka and ZooKeeper work.
>
> I uploaded it on SlideShare, I thought it might be useful to other people:
> http://fr.slideshare.net/NicolasMotte/training-kafka-
> and-zookeeper-monitoring-and-operability
>
> In the description you will get a link to the version with audio
> description.
>
> Cheers
> Nico
>


-- 
Ben Stopford


Re: How can I delete a topic programatically?

2016-10-11 Thread Ratha v
Thank you..

On 12 October 2016 at 16:30, Jianbin Wei  wrote:

> You can check this
>
> http://kafka.apache.org/documentation.html#basic_ops_add_topic
>
> But from our experience it is best to delete topics one by one, i.e., make
> sure Kafka is in good shape before and after deleting a topic before
> working on next one.
>
> Regards,
>
> -- Jianbin
>
> > On Oct 11, 2016, at 9:26 PM, Ratha v  wrote:
> >
> > Thanks..Which bash script I need to run?
> >
> >> On 12 October 2016 at 15:20, Ali Akhtar  wrote:
> >>
> >> The last time I tried, I couldn't find a way to do it, other than to
> >> trigger the bash script for topic deletion programatically.
> >>
> >>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v 
> wrote:
> >>>
> >>> Hi all;
> >>>
> >>> I have two topics(source and target). I do some processing on the
> message
> >>> available in the source topic and i merge both topic.
> >>> That is;
> >>>
> >>> builder.stream(sourceTopic).to(targetTopic)
> >>>
> >>> Once merged I no longer require the sourceTopic. I want to delete it.
> >>>
> >>> How can I do that programatically in java? I use highelevel  client
> APIs,
> >>> kafka v 0.10.0.1
> >>>
> >>>
> >>> Thanks
> >>> --
> >>> -Ratha
> >>> http://vvratha.blogspot.com/
> >>>
> >>
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/