usage of depricated method in kafka 2_12.1.0.0

2018-02-21 Thread pravin kumar
i have tried wikifeed example with Kafka 2_12.1.0.0.the count method is now
depricated ,

previously in kafka_2.11-0.10.2.1 i have given count(localStateStoreName).

how to give the statestore name in Kafka 2_12.1.0.0.

i have attached the code below,
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {
final static String WIKIFEED_INPUT="wikifeedInput";
final static String WIKIFEED_OUTPUT="wikifeedOutput";
final static String WIKIFEED_LAMBDA="WikiFeedLambda";
final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String COUNT_STORE="countstore";
final static String STAT_DIR="/home/admin/Documents/kafka_2.12.1.0.0/kafka-streams";

public static void main(String[] args) {
KafkaStreams kafkaStreams=getWikifeedStreams();
kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}

public static KafkaStreams getWikifeedStreams(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(COUNT_STORE);
kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}
}


RE: difference between key.serializer & default.key.serde

2018-02-21 Thread adrien ruffie
Hello Matthias,


great thank for your response. I knew difference between deserializer --> 
consumer and serializer --> producer


But I don't knew the differences between low and high API 


I take a notes.


Thank again & best regards.


Adrien


De : Matthias J. Sax 
Envoyé : mercredi 21 février 2018 23:45:26
À : users@kafka.apache.org
Objet : Re: difference between key.serializer & default.key.serde

It's different abstractions use in different APIs.

Consumer API:

Only reads data (with a single type) and thus uses as deserializer and
config `key.deserializer`.


Producer API:

Only writes data (with a single type) and thus uses a serializer and
config `key.serializer`.


Streams API:

Reads and writes data and uses Serde (short for SerializerDeserializer).
It's a wrapper class for a serializer and deserializer at once. Thus,
the used config is `default.key.serde`. It's name uses prefix `default`
as you can overwrite the Serde specified in the config at operator
level. In Streams API, you usually handle more than one data type and
thus usually need more than one Serde.


-Matthias

On 2/21/18 1:28 PM, adrien ruffie wrote:
> Hello all I read the documentation but I not really understand the different 
> between
>
>
> default.key.serde and key.serializer + key.deserializer
>
>
> and
>
>
> default.value.serde and value.serializer + value.deserializer
>
>
> I don't understand the differents usages ...
>
>
> Can you enlighten le a little more please ?
>
>
> Best regards,
>
>
> Adrien
>



Doubts about multiple instance in kafka

2018-02-21 Thread pravin kumar
I have the Kafka confluent Document.

But i cant understand the following line.

"It is important to understand that Kafka Streams is not a resource
manager, but a library that “runs” anywhere its stream processing
application runs. Multiple instances of the application are executed either
on the same machine, or spread across multiple machines and tasks can
be distributed
automatically by the library

to those running application instances"

i have tried to run on same machine with multiple JVM with multiple
consumers.

is it correct way to run on same machine using multiple consumers??
or is there any other way??
i have attached the code below
package kafka.examples.MultiConsumerMultipartition.taskConfig;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 23/10/17.
 */
public class MultiPartitionMultiConsumerDriver {

public static final String CONSUMER_GROUP_ID = "multipartitionmulticonsumerdriver2";
private static final int MAX_RECORDS=1;
public static void main(String[] args) throws InterruptedException {
produceInput();
consumerOutput();
}

public static Properties getConsumerProps() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass().getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

return properties;
}

public static void produceInput(){
Random random=new Random();
String[] msg={"hi","my","name","is","pravin","kumar","studied","in","madras","institute","of","technology"
,"hi","my","name","is","pravin","kumar","studied","in","good","shepherd","school","properties","put"
,"ConsumerConfig","BOOTSTRAP","SERVERS","CONFIG","Single","Partition","MultiConsumer","UsingStream"
, "BOOTSTRAP","SERVER","properties","put","StreamsConfig","DEFAULT","KEY","SERDE","CLASS","CONFIG"
,"Serdes","String","getClass","getName"};
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName());
KafkaProducer producer=new KafkaProducer(producerProps);
IntStream.range(0,MAX_RECORDS)
.forEach(record ->producer.send(new ProducerRecord
(MultiPartitionMultiConsumerUsingStream.INPUT_TOPIC,null,msg[random.nextInt(msg.length)])));//msg[random.nextInt(msg.length)]

producer.flush();
}

public static void consumerOutput() throws InterruptedException {
Properties consumerProps = getConsumerProps();
KafkaConsumer consumer = new KafkaConsumer(consumerProps);
consumer.subscribe(Collections.singleton(MultiPartitionMultiConsumerUsingStream.OUTPUT_TOPIC));
while (true) {
Thread.sleep(5_000);
consumer.poll(Long.MAX_VALUE).forEach(ConsumerRecord ->
System.out.println("Partition :"+ConsumerRecord.partition()+"Key : " + ConsumerRecord.key() + "Value : " + ConsumerRecord.value()));
}

}
}
package kafka.examples.MultiConsumerMultipartition.taskConfig;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import 

[VOTE] 1.0.1 RC2

2018-02-21 Thread Ewen Cheslack-Postava
Hello Kafka users, developers and client-developers,

This is the third candidate for release of Apache Kafka 1.0.1.

This is a bugfix release for the 1.0 branch that was first released with
1.0.0 about 3 months ago. We've fixed 49 issues since that release. Most of
these are non-critical, but in aggregate these fixes will have significant
impact. A few of the more significant fixes include:

* KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
plugins
* KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
down conversion
* KAFKA-6269: KTable state restore fails after rebalance
* KAFKA-6190: GlobalKTable never finishes restoring when consuming
transactional messages
* KAFKA-6529: Stop file descriptor leak when client disconnects with staged
receives
* KAFKA-6238: Issues with protocol version when applying a rolling upgrade
to 1.0.0

Release notes for the 1.0.1 release:
http://home.apache.org/~ewencp/kafka-1.0.1-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Saturday Feb 24, 9pm PT ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~ewencp/kafka-1.0.1-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~ewencp/kafka-1.0.1-rc2/javadoc/

* Tag to be voted upon (off 1.0 branch) is the 1.0.1 tag:
https://github.com/apache/kafka/tree/1.0.1-rc2

* Documentation:
http://kafka.apache.org/10/documentation.html

* Protocol:
http://kafka.apache.org/10/protocol.html

/**

Thanks,
Ewen Cheslack-Postava


Re: difference between key.serializer & default.key.serde

2018-02-21 Thread Matthias J. Sax
It's different abstractions use in different APIs.

Consumer API:

Only reads data (with a single type) and thus uses as deserializer and
config `key.deserializer`.


Producer API:

Only writes data (with a single type) and thus uses a serializer and
config `key.serializer`.


Streams API:

Reads and writes data and uses Serde (short for SerializerDeserializer).
It's a wrapper class for a serializer and deserializer at once. Thus,
the used config is `default.key.serde`. It's name uses prefix `default`
as you can overwrite the Serde specified in the config at operator
level. In Streams API, you usually handle more than one data type and
thus usually need more than one Serde.


-Matthias

On 2/21/18 1:28 PM, adrien ruffie wrote:
> Hello all I read the documentation but I not really understand the different 
> between
> 
> 
> default.key.serde and key.serializer + key.deserializer
> 
> 
> and
> 
> 
> default.value.serde and value.serializer + value.deserializer
> 
> 
> I don't understand the differents usages ...
> 
> 
> Can you enlighten le a little more please ?
> 
> 
> Best regards,
> 
> 
> Adrien
> 



signature.asc
Description: OpenPGP digital signature


Re: Error handling

2018-02-21 Thread Guozhang Wang
Hello Maria,

You are welcome to read the faq section on AK web docs:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ

And there are also corresponding sections on Confluent docs:

https://docs.confluent.io/current/streams/faq.html#failure-and-exception-handling


Guozhang



On Thu, Feb 15, 2018 at 10:18 PM, Maria Pilar  wrote:

> Hi everyone
>
> I'm designing a control handling for my kafka stream api.
> I'm would like to know any documentation or best practise that I can read.
> Basically I'm creating some topics error for failed messages and retry
> topics.
>
> Any suggestions?
>
> Thanks
>



-- 
-- Guozhang


difference between key.serializer & default.key.serde

2018-02-21 Thread adrien ruffie
Hello all I read the documentation but I not really understand the different 
between


default.key.serde and key.serializer + key.deserializer


and


default.value.serde and value.serializer + value.deserializer


I don't understand the differents usages ...


Can you enlighten le a little more please ?


Best regards,


Adrien


FINAL REMINDER: CFP for Apache EU Roadshow Closes 25th February

2018-02-21 Thread Sharan F

Hello Apache Supporters and Enthusiasts

This is your FINAL reminder that the Call for Papers (CFP) for the 
Apache EU Roadshow is closing soon. Our Apache EU Roadshow will focus on 
Cloud, IoT, Apache Tomcat, Apache Http and will run from 13-14 June 2018 
in Berlin.
Note that the CFP deadline has been extended to *25*^*th* *February *and 
it will be your final opportunity to submit a talk for thisevent.


Please make your submissions at http://apachecon.com/euroadshow18/

Also note that early bird ticket registrations to attend FOSS Backstage 
including the Apache EU Roadshow, have also been extended and will be 
available until 23^rd February. Please register at 
https://foss-backstage.de/tickets


We look forward to seeing you in Berlin!

Thanks
Sharan Foga, VP Apache Community Development

PLEASE NOTE: You are receiving this message because you are subscribed 
to a user@ or dev@ list of one or more Apache Software Foundation projects.




Re: commiting consumed offsets synchronously (every message)

2018-02-21 Thread Sönke Liebau
Kafka Streams would enable exactly once processing, yes. But this only
holds true as long as your data stays in Kafka topics, as soon as you
want to write data to an external system the exactly once guarantees
don't hold true any more and you end up with the same issues - so I
suspect that his would only move your issues to a later date. The same
goes for Kafka Connect, without implementing a proper 2 phase commit
protocol I don't think there is any way that true exactly once
processing from Kafka to an external system is possible.

I don't think that there would be a large performance benefit when
using Streams as I assume (have never checked though, maybe someone
else can chime in here) that Streams internally uses the same Java
objects for reading from a topic - if anything the transaction
overhead from exactly once processing might actually slow it down even
more.

If you are writing to a traditional database something like this might
bring you closer to your target:

1. read from kafka
2. start transaction in db
3. update whatever your target table is
4. record unique id for the record in a "processed" table
5. commit transaction
6. commit offset to Kafka

You could do this for batches as well, there is not strictly speaking
a need to limit yourself to one record - though you need to ensure
that you roll back the entire transaction if one record fails. This
way you could use the "processed" table to check whether a record was
already processed if your job fails between steps 5 and 6.

Best regards,
Sönke

On Wed, Feb 21, 2018 at 4:26 PM, Marasoiu, Nicu
 wrote:
> Thank you very much,
> Would you think that Kafka-Streams with exactly_once flag enabled would 
> perform better than kafka client with individual commit per message as timed 
> below? Perhaps the implementation of exactly-once read-process-write is using 
> other methods and its performance is better.
> Indeed, incrementing a counter per processed message key in our database 
> would be one way of accounting for duplicate processing, but I am not sure 
> how can I do this in an efficient way (not querying all table).
> Until now I concentrated on accounting for duplicate keys in topics via a 
> kafka-streams job. That might be enough only if we transform a code that we 
> have for main business logic in a pure function, and create the effect of 
> writing to the database via a kafka connector. Since I understand both 
> streams and connectors support exactly once, it would be a possibility to 
> eliminate the possibility of duplicate processing downstream of a topic.
>
> Thanks for your help,
> Nicu
> 
> From: Sönke Liebau [soenke.lie...@opencore.com.INVALID]
> Sent: Wednesday, February 21, 2018 4:59 PM
> To: users@kafka.apache.org
> Subject: Re: commiting consumed offsets synchronously (every message)
>
> Hi Nicu,
>
> committing after every message and thus retrieving them with a batch size
> of 1 will definitely make a huge difference in performance!
> I've rigged a quick (and totally non academic) test which came up with the
> following numbers:
>
> Batching consumer - Consumed 1000490 records in 5 seconds
> Non Batching, commiting consumer - Consumed 100 records in 3023 seconds
>
> The first line was a consumer with default settings and auto.offset.commit,
> the second one retrieved messages one per poll and called commitSync after
> every message.
>
>
> I am not sure if you actually need this though, wouldn't your deduplication
> process be able to check the downstream system, whether that specific
> message was already processed and use that to identify duplicates?
> Or are you not sending the actual records downstream but just doing
> something like summing, counting, ... them?
>
> It's tough to be more specific without knowing more specifics, but maybe
> that helps a bit already?
>
> Best regards,
> Sönke
>
> On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
> nicu.maras...@metrosystems.net> wrote:
>> Hi,
>> In order to obtain an exactly-once semantics, we are thinking of doing
> at-least-once processing, and then have a compensation mechanism to fix the
> results in few minutes by correcting them by substracting the effects of
> the duplicates. However, in order to do that, it seems that at least this
> compensation mechanism needs to read from a topic and commit offsets every
> message, so that when failover happens, it would not interpret as
> duplicates the events from the latest commit until present. What are the
> performance implications of this, and what advice would you have for
> exactly-once behavior (at least with controllable error)?
>> Thank you,
>> Nicu Marasoiu
>> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
> 40235 Düsseldorf, Germany
>> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
>> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
> Wim van Herwijnen
>> Sitz 

Building a bugfix branch and using it with existing distributions

2018-02-21 Thread Niek Peeters
Hi all,


Recently, I experienced a bug in Connect (dist 1.0.0) only to find out it was 
registered and fixed already 
(KAFKA-6277). The fix got 
backported into the 1.0.0 branch but obviously the 1.0.0 release (and 
distribution) didn't change.

The kafka distribution/release I use (1.0.0) has several jars in its libs 
folder, and I want to use the version in which the bugfix is included.


In which way can I "upgrade" my connect cluster? I guess I would have to build 
kafka myself, but how do I then know which jars to copy, and do I need to copy 
them from all seperate project folders (core/connect/etc.)


Thanks in advance,

Niek Peeters


RE: commiting consumed offsets synchronously (every message)

2018-02-21 Thread Marasoiu, Nicu
Thank you very much,
Would you think that Kafka-Streams with exactly_once flag enabled would perform 
better than kafka client with individual commit per message as timed below? 
Perhaps the implementation of exactly-once read-process-write is using other 
methods and its performance is better.
Indeed, incrementing a counter per processed message key in our database would 
be one way of accounting for duplicate processing, but I am not sure how can I 
do this in an efficient way (not querying all table).
Until now I concentrated on accounting for duplicate keys in topics via a 
kafka-streams job. That might be enough only if we transform a code that we 
have for main business logic in a pure function, and create the effect of 
writing to the database via a kafka connector. Since I understand both streams 
and connectors support exactly once, it would be a possibility to eliminate the 
possibility of duplicate processing downstream of a topic.

Thanks for your help,
Nicu

From: Sönke Liebau [soenke.lie...@opencore.com.INVALID]
Sent: Wednesday, February 21, 2018 4:59 PM
To: users@kafka.apache.org
Subject: Re: commiting consumed offsets synchronously (every message)

Hi Nicu,

committing after every message and thus retrieving them with a batch size
of 1 will definitely make a huge difference in performance!
I've rigged a quick (and totally non academic) test which came up with the
following numbers:

Batching consumer - Consumed 1000490 records in 5 seconds
Non Batching, commiting consumer - Consumed 100 records in 3023 seconds

The first line was a consumer with default settings and auto.offset.commit,
the second one retrieved messages one per poll and called commitSync after
every message.


I am not sure if you actually need this though, wouldn't your deduplication
process be able to check the downstream system, whether that specific
message was already processed and use that to identify duplicates?
Or are you not sending the actual records downstream but just doing
something like summing, counting, ... them?

It's tough to be more specific without knowing more specifics, but maybe
that helps a bit already?

Best regards,
Sönke

On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
nicu.maras...@metrosystems.net> wrote:
> Hi,
> In order to obtain an exactly-once semantics, we are thinking of doing
at-least-once processing, and then have a compensation mechanism to fix the
results in few minutes by correcting them by substracting the effects of
the duplicates. However, in order to do that, it seems that at least this
compensation mechanism needs to read from a topic and commit offsets every
message, so that when failover happens, it would not interpret as
duplicates the events from the latest commit until present. What are the
performance implications of this, and what advice would you have for
exactly-once behavior (at least with controllable error)?
> Thank you,
> Nicu Marasoiu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
informieren Sie bitte unverzüglich den Absender und vernichten Sie die
E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
named addressee. They may contain confidential information which may also
be protected by professional secrecy. Unless you are the named addressee
(or authorised to receive for the addressee) you may not copy or use this
message or any attachment or disclose the contents to anyone else. If this
e-mail was sent to you by mistake please notify the sender immediately and
delete this e-mail.
>



--
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 
Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van 
Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office 
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von 

Re: Doubts in KStreams

2018-02-21 Thread Bill Bejeck
Hi Pravin,

1.  Fault tolerance means that state stores are backed by topics,
changelogs, storing the contents of the state store.  For example, in a
worst case scenario, your machine crashed destroying all your local state,
on starting your Kafka Streams application back up the state stores would
recover the data (up to the last committed offset) from their backing
changelog topics.

2. If you have a multiple instance Kafka Streams application (A and B) and
instance A dies, a rebalance occurs, and the tasks from A get assigned to
B.  If instance A had any local state then the new tasks on B use the
changelog topics to fill the state stores for the new task, so the local
state store is recovered up to the last committed offset of the state store
on A before it crashed.

HTH

-Bill

On Wed, Feb 21, 2018 at 5:40 AM, pravin kumar  wrote:

> I have studied KafkaStreams, but not clearly understood
>
> 1.Can someone explain about Fault tolerence.
> 2.I have topicA and topicB with 4 partitions, so it created fourTasks, I
> have created it in singleJVM.But i need to knw how it works in multiple JVM
> and if one jvm goes down,how it another jvm takes the responsibility and
> how the localStateStore is recreated in the JVM which takes responsibility.
>


Re: commiting consumed offsets synchronously (every message)

2018-02-21 Thread Sönke Liebau
Hi Nicu,

committing after every message and thus retrieving them with a batch size
of 1 will definitely make a huge difference in performance!
I've rigged a quick (and totally non academic) test which came up with the
following numbers:

Batching consumer - Consumed 1000490 records in 5 seconds
Non Batching, commiting consumer - Consumed 100 records in 3023 seconds

The first line was a consumer with default settings and auto.offset.commit,
the second one retrieved messages one per poll and called commitSync after
every message.


I am not sure if you actually need this though, wouldn't your deduplication
process be able to check the downstream system, whether that specific
message was already processed and use that to identify duplicates?
Or are you not sending the actual records downstream but just doing
something like summing, counting, ... them?

It's tough to be more specific without knowing more specifics, but maybe
that helps a bit already?

Best regards,
Sönke

On Wed, Feb 21, 2018 at 11:57 AM, Marasoiu, Nicu <
nicu.maras...@metrosystems.net> wrote:
> Hi,
> In order to obtain an exactly-once semantics, we are thinking of doing
at-least-once processing, and then have a compensation mechanism to fix the
results in few minutes by correcting them by substracting the effects of
the duplicates. However, in order to do that, it seems that at least this
compensation mechanism needs to read from a topic and commit offsets every
message, so that when failover happens, it would not interpret as
duplicates the events from the latest commit until present. What are the
performance implications of this, and what advice would you have for
exactly-once behavior (at least with controllable error)?
> Thank you,
> Nicu Marasoiu
> Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12,
40235 Düsseldorf, Germany
> Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
> Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO),
Wim van Herwijnen
> Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232
>
> Betreffend Mails von *@metrosystems.net
> Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind
ausschließlich für den bezeichneten Adressaten bestimmt. Sie können
rechtlich geschützte, vertrauliche Informationen enthalten. Falls Sie nicht
der bezeichnete Empfänger oder zum Empfang dieser E-Mail nicht berechtigt
sind, ist die Verwendung, Vervielfältigung oder Weitergabe der Nachrichten
und Anhänge untersagt. Falls Sie diese E-Mail irrtümlich erhalten haben,
informieren Sie bitte unverzüglich den Absender und vernichten Sie die
E-Mail.
>
> Regarding mails from *@metrosystems.net
> This e-mail message and any attachment are intended exclusively for the
named addressee. They may contain confidential information which may also
be protected by professional secrecy. Unless you are the named addressee
(or authorised to receive for the addressee) you may not copy or use this
message or any attachment or disclose the contents to anyone else. If this
e-mail was sent to you by mistake please notify the sender immediately and
delete this e-mail.
>



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


commiting consumed offsets synchronously (every message)

2018-02-21 Thread Marasoiu, Nicu
Hi,
In order to obtain an exactly-once semantics, we are thinking of doing 
at-least-once processing, and then have a compensation mechanism to fix the 
results in few minutes by correcting them by substracting the effects of the 
duplicates. However, in order to do that, it seems that at least this 
compensation mechanism needs to read from a topic and commit offsets every 
message, so that when failover happens, it would not interpret as duplicates 
the events from the latest commit until present. What are the performance 
implications of this, and what advice would you have for exactly-once behavior 
(at least with controllable error)?
Thank you,
Nicu Marasoiu
Geschäftsanschrift/Business address: METRO SYSTEMS GmbH, Metro-Straße 12, 40235 
Düsseldorf, Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschäftsführung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van 
Herwijnen
Sitz Düsseldorf, Amtsgericht Düsseldorf, HRB 18232/Registered Office 
Düsseldorf, Commercial Register of the Düsseldorf Local Court, HRB 18232

Betreffend Mails von *@metrosystems.net
Die in dieser E-Mail enthaltenen Nachrichten und Anhänge sind ausschließlich 
für den bezeichneten Adressaten bestimmt. Sie können rechtlich geschützte, 
vertrauliche Informationen enthalten. Falls Sie nicht der bezeichnete Empfänger 
oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung, 
Vervielfältigung oder Weitergabe der Nachrichten und Anhänge untersagt. Falls 
Sie diese E-Mail irrtümlich erhalten haben, informieren Sie bitte unverzüglich 
den Absender und vernichten Sie die E-Mail.

Regarding mails from *@metrosystems.net
This e-mail message and any attachment are intended exclusively for the named 
addressee. They may contain confidential information which may also be 
protected by professional secrecy. Unless you are the named addressee (or 
authorised to receive for the addressee) you may not copy or use this message 
or any attachment or disclose the contents to anyone else. If this e-mail was 
sent to you by mistake please notify the sender immediately and delete this 
e-mail.



Doubts in KStreams

2018-02-21 Thread pravin kumar
I have studied KafkaStreams, but not clearly understood

1.Can someone explain about Fault tolerence.
2.I have topicA and topicB with 4 partitions, so it created fourTasks, I
have created it in singleJVM.But i need to knw how it works in multiple JVM
and if one jvm goes down,how it another jvm takes the responsibility and
how the localStateStore is recreated in the JVM which takes responsibility.


RE: broker properties explanations

2018-02-21 Thread adrien ruffie
It's really help me, thank Thomas !


Just a jot of another question,


I don't understand correctly this property:

leader.imbalance.per.broker.percentage  The ratio of leader imbalance allowed 
per broker. The controller would trigger a leader balance if it goes above this 
value per broker. The value is specified in percentage.   int 10
 How the ratio is calculated ? The imbalance broker compare to what ? Moreover 
the 10% by default value is very small, no ?


Thank again Thomas,


Adrien


De : Thomas Aley 
Envoyé : mercredi 21 février 2018 09:24:13
À : users@kafka.apache.org
Objet : Re: broker properties explanations

Hi Adrien,

log.dirs exists to facilitate multiple data directories which allows more
than one disk to be used without the need for RAID. This increases
throughput but beware of naive load balancing that may fill up one disk
way before another.

When log.flush.interval.ms is null the log.flush.interval.messages
property is not used. With default settings, messages are written to disk
immediately.

Hope this helps.

Tom Aley
thomas.a...@ibm.com



From:   adrien ruffie 
To: "users@kafka.apache.org" 
Date:   20/02/2018 20:46
Subject:broker properties explanations



Hello all,

after reading several properties in Kafka documentations, I asked mysleft
some questions ...


these 2 following options are available:

log.dir The directory in which the log data is kept (supplemental for
log.dirs property)string  /tmp/kafka-logs high
log.dirsThe directories in which the log data is kept. If not set,
the value in log.dir is used
But in fact, if is the same thing, why only "log.dirs" isn't kept ? What's
difference in usage ?


Also I noticed that the "data" of logs partition and also the
"application" logs of Kafka are written in the same directory.

Is a mistake of my part ? Because it's very strange for me to log error,
debug, warn application message in the same location of my data ...



After that I don't very understand why the log.flush.interval.messages
have a so big default value ???

log.flush.interval.messages The number of messages accumulated on a
log partition before messages are flushed to disk   long
9223372036854775807
And the log.flush.interval.ms is by default null ... ?

It means that until there are so many messages (9223372036854775807) in my
topics, they will not be flushed to disk ? It can be very long for a
default value 


Best regards,


Adrien



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number
741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: Who is assigned to which partitions

2018-02-21 Thread Per Steffensen

On 21/02/18 08:42, Per Steffensen wrote:
..., when I will happy to do share? 

..., THEN I will BE happy to share


Re: broker properties explanations

2018-02-21 Thread Thomas Aley
Hi Adrien,

log.dirs exists to facilitate multiple data directories which allows more 
than one disk to be used without the need for RAID. This increases 
throughput but beware of naive load balancing that may fill up one disk 
way before another.

When log.flush.interval.ms is null the log.flush.interval.messages 
property is not used. With default settings, messages are written to disk 
immediately. 

Hope this helps.

Tom Aley
thomas.a...@ibm.com



From:   adrien ruffie 
To: "users@kafka.apache.org" 
Date:   20/02/2018 20:46
Subject:broker properties explanations



Hello all,

after reading several properties in Kafka documentations, I asked mysleft 
some questions ...


these 2 following options are available:

log.dir The directory in which the log data is kept (supplemental for 
log.dirs property)string  /tmp/kafka-logs high
log.dirsThe directories in which the log data is kept. If not set, 
the value in log.dir is used
But in fact, if is the same thing, why only "log.dirs" isn't kept ? What's 
difference in usage ?


Also I noticed that the "data" of logs partition and also the 
"application" logs of Kafka are written in the same directory.

Is a mistake of my part ? Because it's very strange for me to log error, 
debug, warn application message in the same location of my data ...



After that I don't very understand why the log.flush.interval.messages 
have a so big default value ???

log.flush.interval.messages The number of messages accumulated on a 
log partition before messages are flushed to disk   long 
9223372036854775807
And the log.flush.interval.ms is by default null ... ?

It means that until there are so many messages (9223372036854775807) in my 
topics, they will not be flushed to disk ? It can be very long for a 
default value 


Best regards,


Adrien



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: Consumer group intermittently can not read any records from a cluster with 3 nodes that has one node down

2018-02-21 Thread Sandor Murakozi
hi Behrang,
I recommend you to check out some docs that explain how partitions and
replication work (e.g.
https://sookocheff.com/post/kafka/kafka-in-a-nutshell/)

What I'd highlight is that the partition leader and the controller are two
different concepts. Each partition has its own leader and It's the leader
and not the controller that's responsible for dealing with producers and
consumers.

Cheers,
Sandor

On Tue, Feb 20, 2018 at 12:50 PM, Behrang  wrote:

> Hi Sandor,
>
> Thanks for your reply. I am not at work right now, but I still am a bit
> confused about what happened at work:
>
> 1- One thing that I confirmed was that one the 3 nodes was definitely down.
> We were unable to telnet into its Kafka port from anywhere. The other two
> nodes were up and we could telnet into their Kafka port.
>
> 2- I modified my app a bit and implemented a means for sending
> DescribeCluster requests to the cluster, setting bootrstrap-servers to all
> the 3 nodes. The result indicated that the controller node (
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/
> DescribeClusterResult.html#controller())
> had an id that was not amongst the nodes (
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/
> DescribeClusterResult.html#nodes()).
> It was the same node that was down (i.e. I could telnet into the other
> nodes but not the controller node). And this was always the same, even
> after a few minutes, the controller node's id was still the same.
>
> 3- Despite that, when running my app from my machine, I could get records
> from the topics I had subscribed to, but from another machine, no records
> were getting sent to the app. The app running on the other machine had a
> different consumer groups though.
>
> 4- The cluster had three nodes and when the controller node was done, most
> of the time I was getting a message like this: *"Connection to node -N
> could not be established. Broker may not be available."* where N was either
> -1, -2, or -3 but at one point in my app's logs I found a handful of
> entries in which N was a very large number (e.g. 2156987456).
>
> I assume our cluster was misbehaving, but still can't explain why my app
> was working like this.
>
>
> Best regards,
> Behrang Saeedzadeh
>
> On 20 February 2018 at 19:22, Sandor Murakozi  wrote:
>
> > Hi Behrang,
> >
> > All reads and writes of a partition go through the leader of that
> > partition.
> > If the leader of a partition is down you will not be able to
> > produce/consume data in it until a new leader is elected. Typically it
> > happens in a few seconds, after that you should be able to use that
> > partition again. If your problem persists I recommend figuring out why
> > leader election does not happen.
> > You might be able to work with other partitions, at least those that have
> > leaders on brokers that are up.
> >
> > Cheers,
> > Sandor Murakozi
> >
> > On Tue, Feb 20, 2018 at 9:00 AM, Behrang  wrote:
> >
> > > Hi,
> > >
> > > I have a Kafka cluster with 3 nodes.
> > >
> > > I pass the nodes in the cluster to a consumer app I am building as
> > > bootstrap servers.
> > >
> > > When one of the nodes in the cluster is down, the consumer group
> > sometimes
> > > CAN read records from the server but sometimes CAN NOT.
> > >
> > > In both cases, the same Kafka node is down.
> > >
> > > Is this behavior normal? Isn't it enough to only have one of the nodes
> in
> > > the Kafka cluster be up and running? I have not delved much into setup
> > and
> > > administration of Kafka clusters, but I thought Kafka uses the nodes
> for
> > HA
> > > and as long as one node is up and running, the cluster remains healthy
> > and
> > > working.
> > >
> > > Best regards,
> > > Behrang Saeedzadeh
> > >
> >
>