Re: Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread karan alang
Hi Vahid,
here is the output of the GetOffsetShell commands (with --time -1 & -2)

$KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
--topic topicPurge --time -2 --partitions 0,1,2

topicPurge:0:67

topicPurge:1:67

topicPurge:2:66

Karans-MacBook-Pro-3:config karanalang$
$KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
--topic topicPurge --time -1 --partitions 0,1,2

topicPurge:0:67

topicPurge:1:67

topicPurge:2:66


So, how do i interpret the above ? I was expecting the zookeeper to be
purged too .. & the offsets shown as 0, however that is not the case. (the
observation seem to tally with what you put in your email,i think)

Also, the consumer is not able to read any data.. so i guess the data is
actually purged ?

However, that also brings up additional questions ..

I was using the GetOffsetShell command to get the count, but seems that is
not necessarily the right way ..

What command should be used to get the count ?

On Thu, Jun 22, 2017 at 8:34 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Karan,
>
> Just to clarify, with `--time -1` you are getting back the latest offset
> of the partition.
> If you do `--time -2` you'll get the earliest valid offset.
>
> So, let's say the latest offset of partition 0 of topic 'test' is 100.
> When you publish 5 messages to the partition, and before retention policy
> kicks in,
> - with `--time -1` you should get test:0:105
> - with `--time -2` you should get test:0:100
>
> But after retention policy kicks in and old messages are removed,
> - with `--time -1` you should get test:0:105
> - with `--time -2` you should get test:0:105
>
> Could you please advise whether you're seeing a different behavior?
>
> Thanks.
> --Vahid
>
>
>
>
> From:   "Vahid S Hashemian" 
> To: users@kafka.apache.org
> Date:   06/22/2017 06:43 PM
> Subject:Re: Deleting/Purging data from Kafka topics (Kafka 0.10)
>
>
>
> Hi Karan,
>
> I think the issue is in verification step. Because the start and end
> offsets are not going to be reset when messages are deleted.
> Have you checked whether a consumer would see the messages that are
> supposed to be deleted? Thanks.
>
> --Vahid
>
>
>
> From:   karan alang 
> To: users@kafka.apache.org
> Date:   06/22/2017 06:09 PM
> Subject:Re: Deleting/Purging data from Kafka topics (Kafka 0.10)
>
>
>
> Hi Vahid,
>
> somehow, the changes suggested don't seem to be taking effect, and i dont
> see the data being purged from the topic.
>
> Here are the steps i followed -
>
> 1) topic is set with param -- retention.ms=1000
>
> $KAFKA10_HOME/bin/kafka-topics.sh --describe --topic topicPurge
> --zookeeper
> localhost:2161
>
> Topic:topicPurge PartitionCount:3 ReplicationFactor:3 Configs:retention.ms
> =1000
>
> Topic: topicPurge Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
>
> Topic: topicPurge Partition: 1 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
>
> Topic: topicPurge Partition: 2 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
>
>
> 2) There are 4 brokers, and in the server.properties (for each of the
> brokers), i've modified the following property
>
> log.retention.check.interval.ms=3
>
> I am expecting the data to be purged every 30 secs based on property -
> log.retention.check.interval.ms, however, that does not seem to be
> happening.
>
> 3) Here is the command to check the offsets
>
> $KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
> --topic topicPurge --time -1 --partitions 0,1,2
>
> topicPurge:0:67
>
> topicPurge:1:67
>
> topicPurge:2:66
>
>
> Any ideas on what the issue might be ?
>
>
>
>
>
>
>
> On Thu, Jun 22, 2017 at 1:31 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Hi Karan,
> >
> > The other broker config that plays a role here is
> > "log.retention.check.interval.ms".
> > For a low log retention time like in your example if this broker config
> > value is much higher, then the broker doesn't delete old logs regular
> > enough.
> >
> > --Vahid
> >
> >
> >
> > From:   karan alang 
> > To: users@kafka.apache.org
> > Date:   06/22/2017 12:27 PM
> > Subject:Deleting/Purging data from Kafka topics (Kafka 0.10)
> >
> >
> >
> > Hi All -
> > How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
> > installed.
> >
> > I tried setting the parameter of the topic as shown below ->
> >
> > $KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
> > --topic mmtopic6 --config retention.ms=1000
> >  I was expecting to have the data purged in about a min or so ..
> however,
> > i
> > dont see that happening ..
> > any ideas on what needs to be done ?
> >
> >
> >
> >
> >
>
>
>
>
>
>
>
>
>


Re: Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread Vahid S Hashemian
Hi Karan,

Just to clarify, with `--time -1` you are getting back the latest offset 
of the partition.
If you do `--time -2` you'll get the earliest valid offset.

So, let's say the latest offset of partition 0 of topic 'test' is 100.
When you publish 5 messages to the partition, and before retention policy 
kicks in,
- with `--time -1` you should get test:0:105
- with `--time -2` you should get test:0:100

But after retention policy kicks in and old messages are removed,
- with `--time -1` you should get test:0:105
- with `--time -2` you should get test:0:105

Could you please advise whether you're seeing a different behavior?

Thanks.
--Vahid




From:   "Vahid S Hashemian" 
To: users@kafka.apache.org
Date:   06/22/2017 06:43 PM
Subject:Re: Deleting/Purging data from Kafka topics (Kafka 0.10)



Hi Karan,

I think the issue is in verification step. Because the start and end 
offsets are not going to be reset when messages are deleted.
Have you checked whether a consumer would see the messages that are 
supposed to be deleted? Thanks.

--Vahid



From:   karan alang 
To: users@kafka.apache.org
Date:   06/22/2017 06:09 PM
Subject:Re: Deleting/Purging data from Kafka topics (Kafka 0.10)



Hi Vahid,

somehow, the changes suggested don't seem to be taking effect, and i dont
see the data being purged from the topic.

Here are the steps i followed -

1) topic is set with param -- retention.ms=1000

$KAFKA10_HOME/bin/kafka-topics.sh --describe --topic topicPurge 
--zookeeper
localhost:2161

Topic:topicPurge PartitionCount:3 ReplicationFactor:3 Configs:retention.ms
=1000

Topic: topicPurge Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: topicPurge Partition: 1 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3

Topic: topicPurge Partition: 2 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0


2) There are 4 brokers, and in the server.properties (for each of the
brokers), i've modified the following property

log.retention.check.interval.ms=3

I am expecting the data to be purged every 30 secs based on property -
log.retention.check.interval.ms, however, that does not seem to be
happening.

3) Here is the command to check the offsets

$KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
--topic topicPurge --time -1 --partitions 0,1,2

topicPurge:0:67

topicPurge:1:67

topicPurge:2:66


Any ideas on what the issue might be ?







On Thu, Jun 22, 2017 at 1:31 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Karan,
>
> The other broker config that plays a role here is
> "log.retention.check.interval.ms".
> For a low log retention time like in your example if this broker config
> value is much higher, then the broker doesn't delete old logs regular
> enough.
>
> --Vahid
>
>
>
> From:   karan alang 
> To: users@kafka.apache.org
> Date:   06/22/2017 12:27 PM
> Subject:Deleting/Purging data from Kafka topics (Kafka 0.10)
>
>
>
> Hi All -
> How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
> installed.
>
> I tried setting the parameter of the topic as shown below ->
>
> $KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
> --topic mmtopic6 --config retention.ms=1000
>  I was expecting to have the data purged in about a min or so .. 
however,
> i
> dont see that happening ..
> any ideas on what needs to be done ?
>
>
>
>
>










答复: kafka version 0.10.2.1 consumer can not get the message

2017-06-22 Thread Caokun (Jack, Platform)
The issue is in zookeeper and kafka configuration

Kafka server.proterties
#advertised.host.name=10.179.165.7 # commnent at 20170621
#advertised.listeners=PLAINTEXT://0.0.0.0:9080 # commnent at 20170621
#port=9080 #comment at 20170621
listeners=PLAINTEXT://10.179.165.7:9080 #changed from 0.0.0.0 to 10.179.165.7 
at20170621
delete.topic.enable=true #add at 20170621
zookeeper zoo.cfg
server.0=10.179.165.7:2287:3387 #added at 20170621

发件人: Caokun (Jack, Platform)
发送时间: 2017年6月20日 23:07
收件人: 'users@kafka.apache.org'
抄送: Xukeke
主题: kafka version 0.10.2.1 consumer can not get the message

Hello experts
I write the kafka demo with java .
The prouct can send the message but the consumer can not get the message
My  kafka configuration is ok
./kafka-console-producer.sh --broker-list localhost:9080 --topic testkun
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkun 
--from-beginning

The following is the java code of consumer,product ,app ant interface
Thakns a lot

package com.huawei.business;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;


public class hwKafkaConsumer extends Thread {
private final String topic;

public hwKafkaConsumer(final String topic)
{

this.topic = topic;
}
@Override
public void  run()
{

Properties props = new Properties();
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("enable.auto.commit", "true");//
props.put("auto.commit.interval.ms", "1000");//
props.put("zookeeper.session.timeout.ms", "4");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("rebalance.backoff.ms", "2000");
props.put("rebalance.max.retries", "10");
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");//add to comsumer the earliest 
message
Consumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(this.topic));
System.out.printf("consumer :the topic is "+this.topic+"   ");


while (true) {
System.out.printf("consumer:recieve the message in while loop  ");
ConsumerRecords records = consumer.poll(100);//seem to 
hang here
  System.out.printf("  consumer after create records  ");
for (ConsumerRecord record : records) {
 System.out.printf("for loop to parse the message.");
System.out.printf("offset = %d, key = %s, value1 = %s%n", 
record.offset(), record.key(), record.value());
}
System.out.printf("  consumer after for loop  ");

 }


}
  }

Any one can help to find the problem?
Thanks a lot

the product test is as following:

package com.huawei.business;
import java.util.Date;
import java.util.Properties;


import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

//import kafka.producer.KeyedMessage;
//import kafka.producer.ProducerConfig;

public class hwKafkaProducer extends Thread
{
//private final kafka.javaapi.producer.Producer producer;
private final String topic;
// private final Properties props = new Properties();

public hwKafkaProducer(final String topic)
{
   // props.put("serializer.class", "kafka.serializer.StringEncoder");
   // props.put("metadata.broker.list", "10.179.165.7:9080");
   // producer = new kafka.javaapi.producer.Producer(new 
ProducerConfig(props));

this.topic = topic;
}
@Override
public void run()
{
int messageNo = 1;

Properties props = new Properties();
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.179.165.7:9080");
props.put("bootstrap.servers", "10.179.165.7:9080");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("group.id", KafkaProperties.groupId);
props.put("k

Re: Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread Vahid S Hashemian
Hi Karan,

I think the issue is in verification step. Because the start and end 
offsets are not going to be reset when messages are deleted.
Have you checked whether a consumer would see the messages that are 
supposed to be deleted? Thanks.

--Vahid



From:   karan alang 
To: users@kafka.apache.org
Date:   06/22/2017 06:09 PM
Subject:Re: Deleting/Purging data from Kafka topics (Kafka 0.10)



Hi Vahid,

somehow, the changes suggested don't seem to be taking effect, and i dont
see the data being purged from the topic.

Here are the steps i followed -

1) topic is set with param -- retention.ms=1000

$KAFKA10_HOME/bin/kafka-topics.sh --describe --topic topicPurge 
--zookeeper
localhost:2161

Topic:topicPurge PartitionCount:3 ReplicationFactor:3 Configs:retention.ms
=1000

Topic: topicPurge Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: topicPurge Partition: 1 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3

Topic: topicPurge Partition: 2 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0


2) There are 4 brokers, and in the server.properties (for each of the
brokers), i've modified the following property

log.retention.check.interval.ms=3

I am expecting the data to be purged every 30 secs based on property -
log.retention.check.interval.ms, however, that does not seem to be
happening.

3) Here is the command to check the offsets

$KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
--topic topicPurge --time -1 --partitions 0,1,2

topicPurge:0:67

topicPurge:1:67

topicPurge:2:66


Any ideas on what the issue might be ?







On Thu, Jun 22, 2017 at 1:31 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Karan,
>
> The other broker config that plays a role here is
> "log.retention.check.interval.ms".
> For a low log retention time like in your example if this broker config
> value is much higher, then the broker doesn't delete old logs regular
> enough.
>
> --Vahid
>
>
>
> From:   karan alang 
> To: users@kafka.apache.org
> Date:   06/22/2017 12:27 PM
> Subject:Deleting/Purging data from Kafka topics (Kafka 0.10)
>
>
>
> Hi All -
> How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
> installed.
>
> I tried setting the parameter of the topic as shown below ->
>
> $KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
> --topic mmtopic6 --config retention.ms=1000
>  I was expecting to have the data purged in about a min or so .. 
however,
> i
> dont see that happening ..
> any ideas on what needs to be done ?
>
>
>
>
>






Re: [VOTE] 0.11.0.0 RC2

2017-06-22 Thread Ismael Juma
A quick note on notable changes since rc1:

1. A significant performance improvement if transactions are enabled:
https://github.com/apache/kafka/commit/f239f1f839f8bcbd80cce2a4a8643e15d340be8e
2. Fixed a controller regression if many brokers are started
simultaneously:
https://github.com/apache/kafka/commit/c0033b0e0b9e56242752c82f15c6388d041914a1
3. Fixed a couple of Connect regressions:
https://github.com/apache/kafka/commit/c029960bf4ae2cd79b22886f4ee519c4af0bcc8b
and
https://github.com/apache/kafka/commit/1d65f15f2b656b7817eeaf6ee1d36eb3e2cf063f
4. Fixed an import log cleaner issue:
https://github.com/apache/kafka/commit/186e3d5efc79ed803f0915d472ace77cbec88694

Full diff:
https://github.com/apache/kafka/compare/5b351216621f52a471c21826d0dec3ce3187e697...0.11.0.0-rc2

Ismael

On Fri, Jun 23, 2017 at 2:16 AM, Ismael Juma  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the third candidate for release of Apache Kafka 0.11.0.0.
>
> This is a major version release of Apache Kafka. It includes 32 new KIPs.
> See the release notes and release plan (https://cwiki.apache.org/
> confluence/display/KAFKA/Release+Plan+0.11.0.0) for more details. A few
> feature highlights:
>
> * Exactly-once delivery and transactional messaging
> * Streams exactly-once semantics
> * Admin client with support for topic, ACLs and config management
> * Record headers
> * Request rate quotas
> * Improved resiliency: replication protocol improvement and
> single-threaded controller
> * Richer and more efficient message format
>
> Release notes for the 0.11.0.0 release:
> http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, June 27, 6pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/javadoc/
>
> * Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 8698fa1f41102f1664b05baa4d6953fc9564d91e
>
> * Documentation:
> http://kafka.apache.org/0110/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0110/protocol.html
>
> * Successful Jenkins builds for the 0.11.0 branch:
> Unit/integration tests: https://builds.apache.org/job/
> kafka-0.11.0-jdk7/187/
> System tests: pending (will send an update tomorrow)
>
> /**
>
> Thanks,
> Ismael
>


[VOTE] 0.11.0.0 RC2

2017-06-22 Thread Ismael Juma
Hello Kafka users, developers and client-developers,

This is the third candidate for release of Apache Kafka 0.11.0.0.

This is a major version release of Apache Kafka. It includes 32 new KIPs.
See the release notes and release plan (
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0)
for more details. A few feature highlights:

* Exactly-once delivery and transactional messaging
* Streams exactly-once semantics
* Admin client with support for topic, ACLs and config management
* Record headers
* Request rate quotas
* Improved resiliency: replication protocol improvement and single-threaded
controller
* Richer and more efficient message format

Release notes for the 0.11.0.0 release:
http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Tuesday, June 27, 6pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
http://home.apache.org/~ijuma/kafka-0.11.0.0-rc2/javadoc/

* Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8698fa1f41102f1664b05baa4d6953fc9564d91e

* Documentation:
http://kafka.apache.org/0110/documentation.html

* Protocol:
http://kafka.apache.org/0110/protocol.html

* Successful Jenkins builds for the 0.11.0 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-0.11.0-jdk7/187/
System tests: pending (will send an update tomorrow)

/**

Thanks,
Ismael


Re: Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread karan alang
Hi Vahid,

somehow, the changes suggested don't seem to be taking effect, and i dont
see the data being purged from the topic.

Here are the steps i followed -

1) topic is set with param -- retention.ms=1000

$KAFKA10_HOME/bin/kafka-topics.sh --describe --topic topicPurge --zookeeper
localhost:2161

Topic:topicPurge PartitionCount:3 ReplicationFactor:3 Configs:retention.ms
=1000

Topic: topicPurge Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: topicPurge Partition: 1 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3

Topic: topicPurge Partition: 2 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0


2) There are 4 brokers, and in the server.properties (for each of the
brokers), i've modified the following property

log.retention.check.interval.ms=3

I am expecting the data to be purged every 30 secs based on property -
log.retention.check.interval.ms, however, that does not seem to be
happening.

3) Here is the command to check the offsets

$KAFKA10_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:6092,localhost:6093,localhost:6094,localhost:6095
--topic topicPurge --time -1 --partitions 0,1,2

topicPurge:0:67

topicPurge:1:67

topicPurge:2:66


Any ideas on what the issue might be ?







On Thu, Jun 22, 2017 at 1:31 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Karan,
>
> The other broker config that plays a role here is
> "log.retention.check.interval.ms".
> For a low log retention time like in your example if this broker config
> value is much higher, then the broker doesn't delete old logs regular
> enough.
>
> --Vahid
>
>
>
> From:   karan alang 
> To: users@kafka.apache.org
> Date:   06/22/2017 12:27 PM
> Subject:Deleting/Purging data from Kafka topics (Kafka 0.10)
>
>
>
> Hi All -
> How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
> installed.
>
> I tried setting the parameter of the topic as shown below ->
>
> $KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
> --topic mmtopic6 --config retention.ms=1000
>  I was expecting to have the data purged in about a min or so .. however,
> i
> dont see that happening ..
> any ideas on what needs to be done ?
>
>
>
>
>


How does Zookeeper node failure impact Kafka cluster?

2017-06-22 Thread mayank rathi
Hello All,

Let's assume I have a 3-Node Zookeeper ensemble and a 3-Node Kafka Cluster
in my Kafka environment and one of ZK node goes down.

What would be the impact of 1 ZK node failure on Kafka Cluster?

I am just trying to understand difference between 2 node Zookeeper ensemble
and a 3 node Zookeeper ensemble with one node down.

Thanks in advance.

-- 
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.


Re: Aggregation operations and Joins not working as I would expect.

2017-06-22 Thread Matthias J. Sax
Hi,

there are two things:

1) aggregation operator produce an output record each time the aggregate
is is updates. Thus, you would get 6 record in you example. At the same
time, we deduplicate consecutive outputs with an internal cache. And the
cache is flushed non-mechanistically (either partly flushed on evict, or
completely flushed on commit).

see:
http://docs.confluent.io/current/streams/developer-guide.html#memory-management

2) For the join, the synchronization of both stream based in timestamps
is best effort. Thus, when the order event arrived, is might be the
case, that the corresponding click was not jet processed. Thus, you get
a  results. Note, when the click is processes later,
you will get the result you expect.

see:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics



-Matthias


On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote:
> Hi all,
> 
> I’m playing with Kafka Streams 0.10.2.1 and I’m having some issues here which 
> I hope you can help me to clarify/understand.
> 
> In a hypothetical scenario, I have 2 source streams – clicks and orders – 
> which I’m trying to join to match determine from which page the purchase has 
> been made. I also want to count the number of purchased items per user. This 
> is what my code looks like – you can ignore annotation and any other 
> Spring-related code:
> 
> 
> 
> @Getter
> 
> @ToString
> 
> public class Order {
> 
> 
>   private long timestamp;
> 
>   private String user;
> 
>   private String pos;
> 
>   private int totalItems;
> 
>   private Double grandTotal;
> 
>   private String country;
> 
> …
> }
> 
> 
> @Getter
> 
> @ToString
> 
> public class Click {
> 
> 
>   private long timestamp;
> 
>   private String system;
> 
>   private String user;
> 
>   private String page;
> 
>   private String action;
> 
> …
> 
> }
> 
> 
> @Getter
> 
> @ToString
> 
> public class Purchase {
> 
> 
>   private long timestamp;
> 
>   private String user;
> 
>   private String page;
> 
>   private String pos;
> 
>   private String country;
> 
> …
> }
> 
> 
> @Getter
> 
> @ToString
> 
> public class PurchaseHistory {
> 
> 
>   private String user;
> 
>   private int itemsBought;
> 
> …
> }
> 
> 
> @Component
> 
> @Slf4j
> 
> public class PurchaseStream implements StreamRunner {
> 
> 
>   private @Value("${spring.application.name}") String appName;
> 
>   private final KStreamBuilder kStreamBuilder;
> 
>   private KafkaStreams kafkaStreams;
> 
>   private ApplicationProperties properties;
> 
> 
>   @VisibleForTesting
> 
>   void setAppName(String appName) {
> 
> this.appName = appName;
> 
>   }
> 
> 
>   private Properties buildProperties() {
> 
> Properties props = new Properties();
> 
> props.put("group.id", "purchases-stream");
> 
> props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream");
> 
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
> 
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> properties.getKafkaBroker());
> 
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
> properties.getReplicationFactor());
> 
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> properties.getTimestampExtractor());
> 
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
> properties.getCommitInterval());
> 
> return props;
> 
>   }
> 
> 
>   public PurchaseStream(ApplicationProperties properties) {
> 
> this.properties = properties;
> 
> 
> SerdeFactory serdeFactory = new JsonSerdeFactory();
> 
> Serde stringSerde = Serdes.String();
> 
> 
> kStreamBuilder = new KStreamBuilder();
> 
> 
> KStream clickKStream = kStreamBuilder
> 
> .stream(stringSerde, serdeFactory.serdeFor(Click.class), 
> properties.getClickStreamTopic())
> 
> .filter((k, click) -> “PAY".equals(click.getAction()))
> 
> .map((k, click) -> new KeyValue<>(click.getUser(), click));
> 
> 
> KStream ordersKStream = kStreamBuilder.stream(stringSerde, 
> serdeFactory.serdeFor(Order.class),
> 
> properties.getOrderStreamTopic());
> 
> 
> KStream purchasesKStream = ordersKStream
> 
> .map((k, order) -> new KeyValue<>(order.getUser(),
> 
> Purchase
> 
> .builder()
> 
> .timestamp(order.getTimestamp())
> 
> .user(order.getUser())
> 
> .pos(order.getPos())
> 
> .country(order.getCountry())
> 
> .build()))
> 
> .leftJoin(clickKStream,
> 
> (purchase, click) -> Purchase
> 
> .builder(purchase)
> 
> .page(click == null ? "UNKNOWN" : click.getPage())
> 
> .build(),
> 
> JoinWindows.of(properties.getPurchasesJoinWindow()).until(
> 
> 2 * properties.getPurchasesJoinWindow() + 1),
> 
> stringSerde, serdeFactory.serdeFor(Purchase.class), 
> serdeFactory.serdeFor(Click.class));
> 
> purchasesKStream.to(stringSerde, serdeFactory.serdeFor(Pu

Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread karan alang
Hey Subhash,
thanks, i was able to test this out with 1 partition topic  & verify this.

On Thu, Jun 22, 2017 at 1:39 PM, Subhash Sriram 
wrote:

> Hi Karan,
>
> Yeah, so as to Paolo's point, keep in mind that Kafka does not guarantee
> order across partitions, only within a partition. If you publish messages
> to a topic with 3 partitions, it will only be guaranteed that they are
> consumed in order within the partition.
>
> You can retry your test by publishing to a single partition topic. When you
> consume, you should see that it is all in order.
>
> I hope that helps.
>
> Thanks,
> Subhash
>
> On Thu, Jun 22, 2017 at 4:37 PM, karan alang 
> wrote:
>
> > Hi Subhash,
> >
> > number of partitions - 3
> >
> > On Thu, Jun 22, 2017 at 12:37 PM, Subhash Sriram <
> subhash.sri...@gmail.com
> > >
> > wrote:
> >
> > > How many partitions are in your topic?
> > >
> > > On Thu, Jun 22, 2017 at 3:33 PM, karan alang 
> > > wrote:
> > >
> > > > Hi All -
> > > >
> > > > version - kafka 0.10
> > > > I'm publishing data into Kafka topic using command line,
> > > > and reading the data using kafka console consumer
> > > >
> > > > *Publish command ->*
> > > >
> > > > $KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1
> > > > --max-messages 100 --broker-list
> > > > localhost:9092,localhost:9093,localhost:9094,localhost:9095
> > > > --producer.config $KAFKA_HOME/config/producer.properties
> > > >
> > > > *Console Consumer :*
> > > >
> > > > $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper
> localhost:2161
> > > > --topic mmtopic1 --from-beginning
> > > >
> > > > What i see is that the Kafka consumer is not reading the data in
> > sequence
> > > > i.e. the data on console is seen, but not in order it was published.
> > > >
> > > > Is that expected ?
> > > > what do i need to do to ensure the Kafka consumer reads the data in
> > > > sequence ?
> > > >
> > >
> >
>


Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread karan alang
got it, thanks!

On Thu, Jun 22, 2017 at 12:40 PM, Paolo Patierno  wrote:

> Kafka guarantees messages ordering at partition level not across
> partitions at topic level. Having out of order reading maybe possible If
> your topic has more than one partition.
>
> From: Subhash Sriram
> Sent: Thursday, 22 June, 21:37
> Subject: Re: Kafka 0.10 - kafka console consumer not reading the data in
> order that it was published
> To: users@kafka.apache.org
>
>
> How many partitions are in your topic? On Thu, Jun 22, 2017 at 3:33 PM,
> karan alang wrote: > Hi All - > > version - kafka 0.10 > I'm publishing
> data into Kafka topic using command line, > and reading the data using
> kafka console consumer > > *Publish command ->* > > 
> $KAFKA_HOME/bin/kafka-verifiable-producer.sh
> --topic mmtopic1 > --max-messages 100 --broker-list >
> localhost:9092,localhost:9093,localhost:9094,localhost:9095 >
> --producer.config $KAFKA_HOME/config/producer.properties > > *Console
> Consumer :* > > $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper
> localhost:2161 > --topic mmtopic1 --from-beginning > > What i see is that
> the Kafka consumer is not reading the data in sequence > i.e. the data on
> console is seen, but not in order it was published. > > Is that expected ?
> > what do i need to do to ensure the Kafka consumer reads the data in >
> sequence ? >
>
>


Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread Subhash Sriram
Hi Karan,

Yeah, so as to Paolo's point, keep in mind that Kafka does not guarantee
order across partitions, only within a partition. If you publish messages
to a topic with 3 partitions, it will only be guaranteed that they are
consumed in order within the partition.

You can retry your test by publishing to a single partition topic. When you
consume, you should see that it is all in order.

I hope that helps.

Thanks,
Subhash

On Thu, Jun 22, 2017 at 4:37 PM, karan alang  wrote:

> Hi Subhash,
>
> number of partitions - 3
>
> On Thu, Jun 22, 2017 at 12:37 PM, Subhash Sriram  >
> wrote:
>
> > How many partitions are in your topic?
> >
> > On Thu, Jun 22, 2017 at 3:33 PM, karan alang 
> > wrote:
> >
> > > Hi All -
> > >
> > > version - kafka 0.10
> > > I'm publishing data into Kafka topic using command line,
> > > and reading the data using kafka console consumer
> > >
> > > *Publish command ->*
> > >
> > > $KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1
> > > --max-messages 100 --broker-list
> > > localhost:9092,localhost:9093,localhost:9094,localhost:9095
> > > --producer.config $KAFKA_HOME/config/producer.properties
> > >
> > > *Console Consumer :*
> > >
> > > $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2161
> > > --topic mmtopic1 --from-beginning
> > >
> > > What i see is that the Kafka consumer is not reading the data in
> sequence
> > > i.e. the data on console is seen, but not in order it was published.
> > >
> > > Is that expected ?
> > > what do i need to do to ensure the Kafka consumer reads the data in
> > > sequence ?
> > >
> >
>


Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread karan alang
Hi Subhash,

number of partitions - 3

On Thu, Jun 22, 2017 at 12:37 PM, Subhash Sriram 
wrote:

> How many partitions are in your topic?
>
> On Thu, Jun 22, 2017 at 3:33 PM, karan alang 
> wrote:
>
> > Hi All -
> >
> > version - kafka 0.10
> > I'm publishing data into Kafka topic using command line,
> > and reading the data using kafka console consumer
> >
> > *Publish command ->*
> >
> > $KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1
> > --max-messages 100 --broker-list
> > localhost:9092,localhost:9093,localhost:9094,localhost:9095
> > --producer.config $KAFKA_HOME/config/producer.properties
> >
> > *Console Consumer :*
> >
> > $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2161
> > --topic mmtopic1 --from-beginning
> >
> > What i see is that the Kafka consumer is not reading the data in sequence
> > i.e. the data on console is seen, but not in order it was published.
> >
> > Is that expected ?
> > what do i need to do to ensure the Kafka consumer reads the data in
> > sequence ?
> >
>


Re: Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread Vahid S Hashemian
Hi Karan,

The other broker config that plays a role here is 
"log.retention.check.interval.ms".
For a low log retention time like in your example if this broker config 
value is much higher, then the broker doesn't delete old logs regular 
enough.

--Vahid



From:   karan alang 
To: users@kafka.apache.org
Date:   06/22/2017 12:27 PM
Subject:Deleting/Purging data from Kafka topics (Kafka 0.10)



Hi All -
How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
installed.

I tried setting the parameter of the topic as shown below ->

$KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
--topic mmtopic6 --config retention.ms=1000
 I was expecting to have the data purged in about a min or so .. however, 
i
dont see that happening ..
any ideas on what needs to be done ?






Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread Paolo Patierno
Kafka guarantees messages ordering at partition level not across partitions at 
topic level. Having out of order reading maybe possible If your topic has more 
than one partition.

From: Subhash Sriram
Sent: Thursday, 22 June, 21:37
Subject: Re: Kafka 0.10 - kafka console consumer not reading the data in order 
that it was published
To: users@kafka.apache.org


How many partitions are in your topic? On Thu, Jun 22, 2017 at 3:33 PM, karan 
alang wrote: > Hi All - > > version - kafka 0.10 > I'm publishing data into 
Kafka topic using command line, > and reading the data using kafka console 
consumer > > *Publish command ->* > > 
$KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1 > --max-messages 
100 --broker-list > localhost:9092,localhost:9093,localhost:9094,localhost:9095 
> --producer.config $KAFKA_HOME/config/producer.properties > > *Console 
Consumer :* > > $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper 
localhost:2161 > --topic mmtopic1 --from-beginning > > What i see is that the 
Kafka consumer is not reading the data in sequence > i.e. the data on console 
is seen, but not in order it was published. > > Is that expected ? > what do i 
need to do to ensure the Kafka consumer reads the data in > sequence ? >



Re: Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread Subhash Sriram
How many partitions are in your topic?

On Thu, Jun 22, 2017 at 3:33 PM, karan alang  wrote:

> Hi All -
>
> version - kafka 0.10
> I'm publishing data into Kafka topic using command line,
> and reading the data using kafka console consumer
>
> *Publish command ->*
>
> $KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1
> --max-messages 100 --broker-list
> localhost:9092,localhost:9093,localhost:9094,localhost:9095
> --producer.config $KAFKA_HOME/config/producer.properties
>
> *Console Consumer :*
>
> $KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2161
> --topic mmtopic1 --from-beginning
>
> What i see is that the Kafka consumer is not reading the data in sequence
> i.e. the data on console is seen, but not in order it was published.
>
> Is that expected ?
> what do i need to do to ensure the Kafka consumer reads the data in
> sequence ?
>


Kafka 0.10 - kafka console consumer not reading the data in order that it was published

2017-06-22 Thread karan alang
Hi All -

version - kafka 0.10
I'm publishing data into Kafka topic using command line,
and reading the data using kafka console consumer

*Publish command ->*

$KAFKA_HOME/bin/kafka-verifiable-producer.sh --topic mmtopic1
--max-messages 100 --broker-list
localhost:9092,localhost:9093,localhost:9094,localhost:9095
--producer.config $KAFKA_HOME/config/producer.properties

*Console Consumer :*

$KAFKA10_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2161
--topic mmtopic1 --from-beginning

What i see is that the Kafka consumer is not reading the data in sequence
i.e. the data on console is seen, but not in order it was published.

Is that expected ?
what do i need to do to ensure the Kafka consumer reads the data in
sequence ?


Deleting/Purging data from Kafka topics (Kafka 0.10)

2017-06-22 Thread karan alang
Hi All -
How do i go about deleting data from Kafka Topics ? I've Kafka 0.10
installed.

I tried setting the parameter of the topic as shown below ->

$KAFKA10_HOME/bin/kafka-topics.sh --zookeeper localhost:2161 --alter
--topic mmtopic6 --config retention.ms=1000
 I was expecting to have the data purged in about a min or so .. however, i
dont see that happening ..
any ideas on what needs to be done ?


Can mirror maker automatically compress messages based on source settings

2017-06-22 Thread tao xiao
Hi team,

As per my experimentation mirror maker doesn't compress messages and send
to target broker if it is not configured to do so even the messages in
source broker are compressed. I understand the current implementation of
 mirror maker has no visibility to what compression codec the source
message uses. I want to know if there is any good way to discovery the
compression setting to notify mirror maker?


Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-22 Thread Eno Thereska
Answers inline: 

> On 22 Jun 2017, at 03:26, Guozhang Wang  wrote:
> 
> Thanks for the updated KIP, some more comments:
> 
> 1.The config name is "default.deserialization.exception.handler" while the
> interface class name is "RecordExceptionHandler", which is more general
> than the intended purpose. Could we rename the class name accordingly?

Sure.


> 
> 2. Could you describe the full implementation of "DefaultExceptionHandler",
> currently it is not clear to me how it is implemented with the configured
> value.
> 
> In addition, I think we do not need to include an additional
> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure()
> function is mainly used for users to pass any customized parameters that is
> out of the Streams library; plus adding such additional config sounds
> over-complicated for a default exception handler. Instead I'd suggest we
> just provide two handlers (or three if people feel strong about the
> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one
> for LogAndContinueOnExceptionHandler. And we can set
> LogAndContinueOnExceptionHandler
> by default.
> 

That's what I had originally. Jay mentioned he preferred one default class, 
with config options.
So with that approach, you'd have 2 config options, one for failing, one for 
continuing, and the one
exception handler would take those options during it's configure() call.

After checking the other exception handlers in the code, I might revert back to 
what I originally had (2 default handlers) 
as Guozhang also re-suggests, but still have the interface extend Configurable. 
Guozhang, you ok with that? In that case
there is no need for the response config option.

Thanks
Eno


> 
> Guozhang
> 
> 
> 
> 
> 
> 
> 
> 
> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska  >
> wrote:
> 
>> Thanks Guozhang,
>> 
>> I’ve updated the KIP and hopefully addressed all the comments so far. In
>> the process also changed the name of the KIP to reflect its scope better:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ 
>> 
>> deserialization+exception+handlers > 
>> confluence/display/KAFKA/KIP-161:+streams+deserialization+
>> exception+handlers>
>> 
>> Any other feedback appreciated, otherwise I’ll start the vote soon.
>> 
>> Thanks
>> Eno
>> 
>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang  wrote:
>>> 
>>> Eno, Thanks for bringing this proposal up and sorry for getting late on
>>> this. Here are my two cents:
>>> 
>>> 1. First some meta comments regarding "fail fast" v.s. "making
>> progress". I
>>> agree that in general we should better "enforce user to do the right
>> thing"
>>> in system design, but we also need to keep in mind that Kafka is a
>>> multi-tenant system, i.e. from a Streams app's pov you probably would not
>>> control the whole streaming processing pipeline end-to-end. E.g. Your
>> input
>>> data may not be controlled by yourself; it could be written by another
>> app,
>>> or another team in your company, or even a different organization, and if
>>> an error happens maybe you cannot fix "to do the right thing" just by
>>> yourself in time. In such an environment I think it is important to leave
>>> the door open to let users be more resilient. So I find the current
>>> proposal which does leave the door open for either fail-fast or make
>>> progress quite reasonable.
>>> 
>>> 2. On the other hand, if the question is whether we should provide a
>>> built-in "send to bad queue" handler from the library, I think that might
>>> be an overkill: with some tweaks (see my detailed comments below) on the
>>> API we can allow users to implement such handlers pretty easily. In
>> fact, I
>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a
>> built-in
>>> handler, as it would then require users to specify the threshold via
>>> configs, etc. I think letting people provide such "eco-libraries" may be
>>> better.
>>> 
>>> 3. Regarding the CRC error: today we validate CRC on both the broker end
>>> upon receiving produce requests and on consumer end upon receiving fetch
>>> responses; and if the CRC validation fails in the former case it would
>> not
>>> be appended to the broker logs. So if we do see a CRC failure on the
>>> consumer side it has to be that either we have a flipped bit on the
>> broker
>>> disks or over the wire. For the first case it is fatal while for the
>> second
>>> it is retriable. Unfortunately we cannot tell which case it is when
>> seeing
>>> CRC validation failures. But in either case, just skipping and making
>>> progress seems not a good choice here, and hence I would personally
>> exclude
>>> these errors from the general serde errors to NOT leave the door open of
>>> making progress.
>>> 
>>> Currently such errors are thrown as KafkaException that wraps an
>>> InvalidRecordExcepti

Aggregation operations and Joins not working as I would expect.

2017-06-22 Thread Daniel Del Castillo Perez
Hi all,

I’m playing with Kafka Streams 0.10.2.1 and I’m having some issues here which I 
hope you can help me to clarify/understand.

In a hypothetical scenario, I have 2 source streams – clicks and orders – which 
I’m trying to join to match determine from which page the purchase has been 
made. I also want to count the number of purchased items per user. This is what 
my code looks like – you can ignore annotation and any other Spring-related 
code:



@Getter

@ToString

public class Order {


  private long timestamp;

  private String user;

  private String pos;

  private int totalItems;

  private Double grandTotal;

  private String country;

…
}


@Getter

@ToString

public class Click {


  private long timestamp;

  private String system;

  private String user;

  private String page;

  private String action;

…

}


@Getter

@ToString

public class Purchase {


  private long timestamp;

  private String user;

  private String page;

  private String pos;

  private String country;

…
}


@Getter

@ToString

public class PurchaseHistory {


  private String user;

  private int itemsBought;

…
}


@Component

@Slf4j

public class PurchaseStream implements StreamRunner {


  private @Value("${spring.application.name}") String appName;

  private final KStreamBuilder kStreamBuilder;

  private KafkaStreams kafkaStreams;

  private ApplicationProperties properties;


  @VisibleForTesting

  void setAppName(String appName) {

this.appName = appName;

  }


  private Properties buildProperties() {

Properties props = new Properties();

props.put("group.id", "purchases-stream");

props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream");

props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
properties.getKafkaBroker());

props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
properties.getReplicationFactor());

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
properties.getTimestampExtractor());

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
properties.getCommitInterval());

return props;

  }


  public PurchaseStream(ApplicationProperties properties) {

this.properties = properties;


SerdeFactory serdeFactory = new JsonSerdeFactory();

Serde stringSerde = Serdes.String();


kStreamBuilder = new KStreamBuilder();


KStream clickKStream = kStreamBuilder

.stream(stringSerde, serdeFactory.serdeFor(Click.class), 
properties.getClickStreamTopic())

.filter((k, click) -> “PAY".equals(click.getAction()))

.map((k, click) -> new KeyValue<>(click.getUser(), click));


KStream ordersKStream = kStreamBuilder.stream(stringSerde, 
serdeFactory.serdeFor(Order.class),

properties.getOrderStreamTopic());


KStream purchasesKStream = ordersKStream

.map((k, order) -> new KeyValue<>(order.getUser(),

Purchase

.builder()

.timestamp(order.getTimestamp())

.user(order.getUser())

.pos(order.getPos())

.country(order.getCountry())

.build()))

.leftJoin(clickKStream,

(purchase, click) -> Purchase

.builder(purchase)

.page(click == null ? "UNKNOWN" : click.getPage())

.build(),

JoinWindows.of(properties.getPurchasesJoinWindow()).until(

2 * properties.getPurchasesJoinWindow() + 1),

stringSerde, serdeFactory.serdeFor(Purchase.class), 
serdeFactory.serdeFor(Click.class));

purchasesKStream.to(stringSerde, serdeFactory.serdeFor(Purchase.class),

properties.getPurchasesTopic());


ordersKStream

.map((k, order) -> new KeyValue<>(order.getUser(),


PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTotalItems()).build()))

.groupByKey(stringSerde, serdeFactory.serdeFor(PurchaseHistory.class))

.aggregate(PurchaseHistoryAggregator::new,

(k, purchaseHistory, purchaseHistoryAggregator) -> 
purchaseHistoryAggregator.add(purchaseHistory),

serdeFactory.serdeFor(PurchaseHistoryAggregator.class), 
“purchaseHistoryStore")

.to(stringSerde, 
serdeFactory.serdeFor(PurchaseHistoryAggregator.class), 
properties.getPurchaseHistoryTopic());

  }


  protected KafkaStreams connect() {

log.info("Creating PurchaseStreams");

StreamsConfig streamsConfig = new StreamsConfig(buildProperties());

return new KafkaStreams(builder(), streamsConfig);

  }


  @Override

  public void run() {

log.info("Starting PurchaseStreams");

kafkaStreams = connect();

kafkaStreams.start();

log.info("Now started PurchaseStreams");

  }


  @Override

  public void stop() {

kafkaStreams.close();

kafkaStreams.cleanUp();

  }

…

}


This is my integration test:



public class PurchaseStreamIntegrationTest {


  private st

Re: Kafka 0.11.0 release

2017-06-22 Thread Guozhang Wang
Raghav,

We are going through the voting process now, expecting to have another RC
and release in a few more days.


Guozhang

On Thu, Jun 22, 2017 at 3:59 AM, Raghav  wrote:

> Hi
>
> Would anyone know when is the Kafka 0.11.0 scheduled to be released ?
>
> Thanks.
>
> --
> Raghav
>



-- 
-- Guozhang


Re: [VOTE] 0.11.0.0 RC1

2017-06-22 Thread Tom Crayford
That's fair, and nice find with the transaction performance improvement!

Once the RC is out, we'll do a final round of performance testing with the
new ProducerPerformance changes enabled.

I think it's fair that this shouldn't delay the release. Is there an
official stance on what should and shouldn't delay a release documented
somewhere?

Thanks

Tom Crayford
Heroku Kafka

On Thu, Jun 22, 2017 at 4:45 PM, Ismael Juma  wrote:

> Hi Tom,
>
> We are going to do another RC to include Apurva's significant performance
> improvement when transactions are enabled:
>
> https://github.com/apache/kafka/commit/f239f1f839f8bcbd80cce2a4a8643e
> 15d340be8e
>
> Given that, we can also include the ProducerPerformance changes that
> Apurva did to find and fix the performance issue.
>
> In my opinion, the ProducerPerformance change alone would not be enough
> reason for another RC as users can run the tool from trunk to test older
> releases. In any case, this is hypothetical at this point. :)
>
> And thanks for continuing your testing, it's very much appreciated!
>
> Ismael
>
> On Wed, Jun 21, 2017 at 8:03 PM, Tom Crayford 
> wrote:
>
>> That looks better than mine, nice! I think the tooling matters a lot to
>> the usability of the product we're shipping, being able to test out Kafka's
>> features on your own hardware/setup is very important to knowing if it can
>> work.
>>
>> On Wed, Jun 21, 2017 at 8:01 PM, Apurva Mehta 
>> wrote:
>>
>>> Hi Tom,
>>>
>>> I actually made modifications to the produce performance tool to do real
>>> transactions earlier this week as part of our benchmarking (results
>>> published here: bit.ly/kafka-eos-perf). I just submitted that patch
>>> here:
>>> https://github.com/apache/kafka/pull/3400/files
>>>
>>> I think my version is more complete since it runs the full gamut of APIs:
>>> initTransactions, beginTransaction, commitTransaction. Also, it is the
>>> version used for our published benchmarks.
>>>
>>> I am not sure that this tool is a blocker for the release though, since
>>> it
>>> doesn't really affect the usability of the feature any way.
>>>
>>> Thanks,
>>> Apurva
>>>
>>> On Wed, Jun 21, 2017 at 11:12 AM, Tom Crayford 
>>> wrote:
>>>
>>> > Hi there,
>>> >
>>> > I'm -1 (non-binding) on shipping this RC.
>>> >
>>> > Heroku has carried on performance testing with 0.11 RC1. We have
>>> updated
>>> > our test setup to use 0.11.0.0 RC1 client libraries. Without any of the
>>> > transactional features enabled, we get slightly better performance than
>>> > 0.10.2.1 with 10.2.1 client libraries.
>>> >
>>> > However, we attempted to run a performance test today with
>>> transactions,
>>> > idempotence and consumer read_committed enabled, but couldn't, because
>>> > enabling transactions requires the producer to call `initTransactions`
>>> > before starting to send messages, and the producer performance tool
>>> doesn't
>>> > allow for that.
>>> >
>>> > I'm -1 (non-binding) on shipping this RC in this state, because users
>>> > expect to be able to use the inbuilt performance testing tools, and
>>> > preventing them from testing the impact of the new features using the
>>> > inbuilt tools isn't great. I made a PR for this:
>>> > https://github.com/apache/kafka/pull/3398 (the change is very small).
>>> > Happy
>>> > to make a jira as well, if that makes sense.
>>> >
>>> > Thanks
>>> >
>>> > Tom Crayford
>>> > Heroku Kafka
>>> >
>>> > On Tue, Jun 20, 2017 at 8:32 PM, Vahid S Hashemian <
>>> > vahidhashem...@us.ibm.com> wrote:
>>> >
>>> > > Hi Ismael,
>>> > >
>>> > > Thanks for running the release.
>>> > >
>>> > > Running tests ('gradlew.bat test') on my Windows 64-bit VM results in
>>> > > these checkstyle errors:
>>> > >
>>> > > :clients:checkstyleMain
>>> > > [ant:checkstyle] [ERROR]
>>> > > C:\Users\User\Downloads\kafka-0.11.0.0-src\clients\src\main\
>>> > > java\org\apache\kafka\common\protocol\Errors.java:89:1:
>>> > > Class Data Abstraction Coupling is 57 (max allowed is 20) classes
>>> > > [ApiExceptionBuilder, BrokerNotAvailableException,
>>> > > ClusterAuthorizationException, ConcurrentTransactionsException,
>>> > > ControllerMovedException, CoordinatorLoadInProgressException,
>>> > > CoordinatorNotAvailableException, CorruptRecordException,
>>> > > DuplicateSequenceNumberException, GroupAuthorizationException,
>>> > > IllegalGenerationException, IllegalSaslStateException,
>>> > > InconsistentGroupProtocolException, InvalidCommitOffsetSizeExcepti
>>> on,
>>> > > InvalidConfigurationException, InvalidFetchSizeException,
>>> > > InvalidGroupIdException, InvalidPartitionsException,
>>> > > InvalidPidMappingException, InvalidReplicaAssignmentException,
>>> > > InvalidReplicationFactorException, InvalidRequestException,
>>> > > InvalidRequiredAcksException, InvalidSessionTimeoutException,
>>> > > InvalidTimestampException, InvalidTopicException,
>>> > > InvalidTxnStateException, InvalidTxnTimeoutException,
>>> > > LeaderNotAvailableException, NetworkException,
>>> NotControllerException,
>>

Re: [VOTE] 0.11.0.0 RC1

2017-06-22 Thread Ismael Juma
Hi Tom,

We are going to do another RC to include Apurva's significant performance
improvement when transactions are enabled:

https://github.com/apache/kafka/commit/f239f1f839f8bcbd80cce2a4a8643e15d340be8e

Given that, we can also include the ProducerPerformance changes that Apurva
did to find and fix the performance issue.

In my opinion, the ProducerPerformance change alone would not be enough
reason for another RC as users can run the tool from trunk to test older
releases. In any case, this is hypothetical at this point. :)

And thanks for continuing your testing, it's very much appreciated!

Ismael

On Wed, Jun 21, 2017 at 8:03 PM, Tom Crayford  wrote:

> That looks better than mine, nice! I think the tooling matters a lot to
> the usability of the product we're shipping, being able to test out Kafka's
> features on your own hardware/setup is very important to knowing if it can
> work.
>
> On Wed, Jun 21, 2017 at 8:01 PM, Apurva Mehta  wrote:
>
>> Hi Tom,
>>
>> I actually made modifications to the produce performance tool to do real
>> transactions earlier this week as part of our benchmarking (results
>> published here: bit.ly/kafka-eos-perf). I just submitted that patch here:
>> https://github.com/apache/kafka/pull/3400/files
>>
>> I think my version is more complete since it runs the full gamut of APIs:
>> initTransactions, beginTransaction, commitTransaction. Also, it is the
>> version used for our published benchmarks.
>>
>> I am not sure that this tool is a blocker for the release though, since it
>> doesn't really affect the usability of the feature any way.
>>
>> Thanks,
>> Apurva
>>
>> On Wed, Jun 21, 2017 at 11:12 AM, Tom Crayford 
>> wrote:
>>
>> > Hi there,
>> >
>> > I'm -1 (non-binding) on shipping this RC.
>> >
>> > Heroku has carried on performance testing with 0.11 RC1. We have updated
>> > our test setup to use 0.11.0.0 RC1 client libraries. Without any of the
>> > transactional features enabled, we get slightly better performance than
>> > 0.10.2.1 with 10.2.1 client libraries.
>> >
>> > However, we attempted to run a performance test today with transactions,
>> > idempotence and consumer read_committed enabled, but couldn't, because
>> > enabling transactions requires the producer to call `initTransactions`
>> > before starting to send messages, and the producer performance tool
>> doesn't
>> > allow for that.
>> >
>> > I'm -1 (non-binding) on shipping this RC in this state, because users
>> > expect to be able to use the inbuilt performance testing tools, and
>> > preventing them from testing the impact of the new features using the
>> > inbuilt tools isn't great. I made a PR for this:
>> > https://github.com/apache/kafka/pull/3398 (the change is very small).
>> > Happy
>> > to make a jira as well, if that makes sense.
>> >
>> > Thanks
>> >
>> > Tom Crayford
>> > Heroku Kafka
>> >
>> > On Tue, Jun 20, 2017 at 8:32 PM, Vahid S Hashemian <
>> > vahidhashem...@us.ibm.com> wrote:
>> >
>> > > Hi Ismael,
>> > >
>> > > Thanks for running the release.
>> > >
>> > > Running tests ('gradlew.bat test') on my Windows 64-bit VM results in
>> > > these checkstyle errors:
>> > >
>> > > :clients:checkstyleMain
>> > > [ant:checkstyle] [ERROR]
>> > > C:\Users\User\Downloads\kafka-0.11.0.0-src\clients\src\main\
>> > > java\org\apache\kafka\common\protocol\Errors.java:89:1:
>> > > Class Data Abstraction Coupling is 57 (max allowed is 20) classes
>> > > [ApiExceptionBuilder, BrokerNotAvailableException,
>> > > ClusterAuthorizationException, ConcurrentTransactionsException,
>> > > ControllerMovedException, CoordinatorLoadInProgressException,
>> > > CoordinatorNotAvailableException, CorruptRecordException,
>> > > DuplicateSequenceNumberException, GroupAuthorizationException,
>> > > IllegalGenerationException, IllegalSaslStateException,
>> > > InconsistentGroupProtocolException, InvalidCommitOffsetSizeException,
>> > > InvalidConfigurationException, InvalidFetchSizeException,
>> > > InvalidGroupIdException, InvalidPartitionsException,
>> > > InvalidPidMappingException, InvalidReplicaAssignmentException,
>> > > InvalidReplicationFactorException, InvalidRequestException,
>> > > InvalidRequiredAcksException, InvalidSessionTimeoutException,
>> > > InvalidTimestampException, InvalidTopicException,
>> > > InvalidTxnStateException, InvalidTxnTimeoutException,
>> > > LeaderNotAvailableException, NetworkException, NotControllerException,
>> > > NotCoordinatorException, NotEnoughReplicasAfterAppendException,
>> > > NotEnoughReplicasException, NotLeaderForPartitionException,
>> > > OffsetMetadataTooLarge, OffsetOutOfRangeException,
>> > > OperationNotAttemptedException, OutOfOrderSequenceException,
>> > > PolicyViolationException, ProducerFencedException,
>> > > RebalanceInProgressException, RecordBatchTooLargeException,
>> > > RecordTooLargeException, ReplicaNotAvailableException,
>> > > SecurityDisabledException, TimeoutException,
>> TopicAuthorizationException,
>> > > TopicExistsException, TransactionCoordinator

Re: help!Kafka failover do not work as expected in Kafka quick start tutorial

2017-06-22 Thread Hans Jespersen
Do you list all three brokers on your consumers bootstrap-server list?

-hans

> On Jun 22, 2017, at 5:15 AM, 夏昀  wrote:
> 
> hello:
> I am trying the quickstart of kafka documentation,link is, 
> https://kafka.apache.org/quickstart. when I moved to Step 6: Setting up a 
> multi-broker cluster,I have deployed 3 kafka broker instance.I killed either 
> server-1 or server-2, everything goes well as the document says. But when I 
> killed the firet broker where brokeID=0, the consumer can't read the new 
> records produced by producer. When I restart the broker 0,consumer can 
> display new messages. why the system work well when broker1 or broker 2 is 
> killed,but can't work when broker 0 is killed?
> can you explain this for me, thank you very much!


Re: consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
Thank you very much , Damian

^_^


> 在 2017年6月22日,22:43,Damian Guy  写道:
> 
> Hi,
> Yes the key format used by a window store changelog is the same format as
> is stored in RocksDB. You can see what the format is by looking here:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
> 
> Thanks,
> Damian
> 
> On Thu, 22 Jun 2017 at 15:23 sy.pan  wrote:
> 
>> I explicitly call KTable.to(Serde>,  Serdes.Long(),
>> String topic),
>> 
>> save the same data to another topic(manually created by myself), then the
>> excp is gone.
>> 
>> 
>> so the **-changelog internal topic has special key format ? (even the key
>> type is same = windowed )
>> 
>> 
>> 



Re: consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread Damian Guy
Hi,
Yes the key format used by a window store changelog is the same format as
is stored in RocksDB. You can see what the format is by looking here:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java

Thanks,
Damian

On Thu, 22 Jun 2017 at 15:23 sy.pan  wrote:

> I explicitly call KTable.to(Serde>,  Serdes.Long(),
> String topic),
>
> save the same data to another topic(manually created by myself), then the
> excp is gone.
>
>
> so the **-changelog internal topic has special key format ? (even the key
> type is same = windowed )
>
>
>


help!Kafka failover do not work as expected in Kafka quick start tutorial

2017-06-22 Thread 夏昀
hello:
I am trying the quickstart of kafka documentation,link is, 
https://kafka.apache.org/quickstart. when I moved to Step 6: Setting up a 
multi-broker cluster,I have deployed 3 kafka broker instance.I killed either 
server-1 or server-2, everything goes well as the document says. But when I 
killed the firet broker where brokeID=0, the consumer can't read the new 
records produced by producer. When I restart the broker 0,consumer can display 
new messages. why the system work well when broker1 or broker 2 is killed,but 
can't work when broker 0 is killed?
can you explain this for me, thank you very much!


Re: consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
I explicitly call KTable.to(Serde>,  Serdes.Long(),  String 
topic),

save the same data to another topic(manually created by myself), then the excp 
is gone.


so the **-changelog internal topic has special key format ? (even the key type 
is same = windowed )




consume ***-changelog topic encounter IllegalArgumentException: Window startMs time cannot be negative

2017-06-22 Thread sy.pan
Hi:

when call KGroupedStream.count(Windows windows , String storeName )

 storeName-changelog is auto created as internal topic, and key type : 
windowed , value type: Long

I try to consume from the internal storeName-changelog,  code sample like:


final Deserializer> windowedDeserializer = new 
WindowedDeserializer<>(
Serdes.String().deserializer());

final KafkaConsumer, Long> consumer = new 
KafkaConsumer, Long>(
consumerProperties, windowedDeserializer, Serdes.Long()
.deserializer());


but the program thrown :

Exception in thread "main" 
org.apache.kafka.common.errors.SerializationException: Error deserializing 
key/value for partition ***-changelog-3 at offset 125274495
Caused by: java.lang.IllegalArgumentException: Window startMs time cannot be 
negative.

I want to know why the matched Serdes could not consume message ?

thank you in advance.





Re: Max message size and compression

2017-06-22 Thread mayank rathi
Hello Eli,

This is from Kafka: Definitive Guide ( by Neha Narkhede , Gwen Shapira ,
and Todd Palino) , Chapter 2. Installing Kafka

"The Kafka broker limits the maximum size of a message that can be
produced, configured by the message.max.bytes parameter which defaults to
100, or 1 megabyte. A producer which tries to send a message larger
than this will receive an error back from the broker and the message will
not be accepted. As with all byte sizes specified on the broker, this
configuration deals with compressed message size, which means that
producers can send messages that are much larger than this value
uncompressed, provided they compress down to under the configured
message.max.bytes size."

Thanks!!

On Thu, Jun 22, 2017 at 4:18 AM, Eli Jordan 
wrote:

> Thanks for the reply Mayank. Do you know if this is documented somewhere?
> I wasnt able to find mention of it.
>
> Thanks
> Eli
>
> > On 22 Jun 2017, at 05:50, mayank rathi  wrote:
> >
> > If you are compressing messages than size of "compressed" message should
> be
> > less than what's specified in these parameters.
> >
> > On Sat, Jun 17, 2017 at 7:46 PM, Eli Jordan 
> > wrote:
> >
> >> Hi
> >>
> >> max.message.bytes controls the maximum message size the kafka server
> will
> >> process
> >>
> >> message.max.bytes controls the maximum message size the consumer will
> >> process
> >>
> >> max.request.size controls the maximum request size for the producer
> >>
> >> Whats not clear to me (and I can't find documented anywhere) is if the
> >> message size limits are imposed on compressed or uncompressed messages,
> >> when compression is enabled.
> >>
> >> Note: I'm use Kafka 0.10.2.0 if that makes any difference.
> >>
> >> Any pointers or advice on this would be greatly appreciated.
> >>
> >> Thanks
> >> Eli
> >
> >
> >
> >
> > --
> > NOTICE: This email message is for the sole use of the intended
> recipient(s)
> > and may contain confidential and privileged information. Any unauthorized
> > review, use, disclosure or distribution is prohibited. If you are not the
> > intended recipient, please contact the sender by reply email and destroy
> > all copies of the original message.
>



-- 
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.


Re: Handling 2 to 3 Million Events before Kafka

2017-06-22 Thread SenthilKumar K
Hi Barton -  I think we can use Async Producer with Call Back api(s) to
keep track on which event failed ..

--Senthil

On Thu, Jun 22, 2017 at 4:58 PM, SenthilKumar K 
wrote:

> Thanks Barton.. I'll look into these ..
>
> On Thu, Jun 22, 2017 at 7:12 AM, Garrett Barton 
> wrote:
>
>> Getting good concurrency in a webapp is more than doable.  Check out
>> these benchmarks:
>> https://www.techempower.com/benchmarks/#section=data-r14&hw=ph&test=db
>> I linked to the single query one because thats closest to a single
>> operation like you will be doing.
>>
>> I'd also note if the data delivery does not need to be guaranteed you
>> could go faster switching the web servers over to UDP and using async mode
>> on the kafka producers.
>>
>> On Wed, Jun 21, 2017 at 2:23 PM, Tauzell, Dave <
>> dave.tauz...@surescripts.com> wrote:
>>
>>> I’m not really familiar with Netty so I won’t be of much help.   Maybe
>>> try posting on a Netty forum to see what they think?
>>> -Dave
>>>
>>> From: SenthilKumar K [mailto:senthilec...@gmail.com]
>>> Sent: Wednesday, June 21, 2017 10:28 AM
>>> To: Tauzell, Dave
>>> Cc: users@kafka.apache.org; senthilec...@apache.org;
>>> d...@kafka.apache.org
>>> Subject: Re: Handling 2 to 3 Million Events before Kafka
>>>
>>> So netty would work for this case ?  I do have netty server and seems to
>>> be i'm not getting the expected results .. here is the git
>>> https://github.com/senthilec566/netty4-server , is this right
>>> implementation ?
>>>
>>> Cheers,
>>> Senthil
>>>
>>> On Wed, Jun 21, 2017 at 7:45 PM, Tauzell, Dave <
>>> dave.tauz...@surescripts.com>
>>> wrote:
>>> I see.
>>>
>>> 1.   You don’t want the 100k machines sending directly to kafka.
>>>
>>> 2.   You can only have a small number of web servers
>>>
>>> People certainly have web-servers handling over 100k concurrent
>>> connections.  See this for some examples:
>>> https://github.com/smallnest/C1000K-Servers .
>>>
>>> It seems possible with the right sort of kafka producer tuning.
>>>
>>> -Dave
>>>
>>> From: SenthilKumar K [mailto:senthilec...@gmail.com>> senthilec...@gmail.com>]
>>> Sent: Wednesday, June 21, 2017 8:55 AM
>>> To: Tauzell, Dave
>>> Cc: users@kafka.apache.org;
>>> senthilec...@apache.org;
>>> d...@kafka.apache.org; Senthil kumar
>>> Subject: Re: Handling 2 to 3 Million Events before Kafka
>>>
>>> Thanks Jeyhun. Yes http server would be problematic here w.r.t network ,
>>> memory ..
>>>
>>> Hi Dave ,  The problem is not with Kafka , it's all about how do you
>>> handle huge data before kafka.  I did a simple test with 5 node Kafka
>>> Cluster which gives good result ( ~950 MB/s ) ..So Kafka side i dont see a
>>> scaling issue ...
>>>
>>> All we are trying is before kafka how do we handle messages from
>>> different servers ...  Webservers can send fast to kafka but still i can
>>> handle only 50k events per second which is less for my use case.. also i
>>> can't deploy 20 webservers to handle this load. I'm looking for an option
>>> what could be the best candidate before kafka , it should be super fast in
>>> getting all and send it to kafka producer ..
>>>
>>>
>>> --Senthil
>>>
>>> On Wed, Jun 21, 2017 at 6:53 PM, Tauzell, Dave <
>>> dave.tauz...@surescripts.com>
>>> wrote:
>>> What are your configurations?
>>>
>>> - production
>>> - brokers
>>> - consumers
>>>
>>> Is the problem that web servers cannot send to Kafka fast enough or your
>>> consumers cannot process messages off of kafka fast enough?
>>> What is the average size of these messages?
>>>
>>> -Dave
>>>
>>> -Original Message-
>>> From: SenthilKumar K [mailto:senthilec...@gmail.com>> senthilec...@gmail.com>]
>>> Sent: Wednesday, June 21, 2017 7:58 AM
>>> To: users@kafka.apache.org
>>> Cc: senthilec...@apache.org; Senthil
>>> kumar; d...@kafka.apache.org
>>> Subject: Handling 2 to 3 Million Events before Kafka
>>>
>>> Hi Team ,   Sorry if this question is irrelevant to Kafka Group ...
>>>
>>> I have been trying to solve problem of handling 5 GB/sec ingestion.
>>> Kafka is really good candidate for us to handle this ingestion rate ..
>>>
>>>
>>> 100K machines > { Http Server (Jetty/Netty) } --> Kafka Cluster..
>>>
>>> I see the problem in Http Server where it can't handle beyond 50K events
>>> per instance ..  I'm thinking some other solution would be right choice
>>> before Kafka ..
>>>
>>> Anyone worked on similar use case and similar load ?
>>> Suggestions/Thoughts ?
>>>
>>> --Senthil
>>> This e-mail and any files transmitted with it are confidential, may
>>> contain sensitive information, and are intended solely for the use of the
>>> individual or entity to whom they are addressed. If you have received this
>>> e-mail in error, please notify the sender by reply e-mail immed

Re: Handling 2 to 3 Million Events before Kafka

2017-06-22 Thread SenthilKumar K
Thanks Barton.. I'll look into these ..

On Thu, Jun 22, 2017 at 7:12 AM, Garrett Barton 
wrote:

> Getting good concurrency in a webapp is more than doable.  Check out these
> benchmarks:
> https://www.techempower.com/benchmarks/#section=data-r14&hw=ph&test=db
> I linked to the single query one because thats closest to a single
> operation like you will be doing.
>
> I'd also note if the data delivery does not need to be guaranteed you
> could go faster switching the web servers over to UDP and using async mode
> on the kafka producers.
>
> On Wed, Jun 21, 2017 at 2:23 PM, Tauzell, Dave <
> dave.tauz...@surescripts.com> wrote:
>
>> I’m not really familiar with Netty so I won’t be of much help.   Maybe
>> try posting on a Netty forum to see what they think?
>> -Dave
>>
>> From: SenthilKumar K [mailto:senthilec...@gmail.com]
>> Sent: Wednesday, June 21, 2017 10:28 AM
>> To: Tauzell, Dave
>> Cc: users@kafka.apache.org; senthilec...@apache.org; d...@kafka.apache.org
>> Subject: Re: Handling 2 to 3 Million Events before Kafka
>>
>> So netty would work for this case ?  I do have netty server and seems to
>> be i'm not getting the expected results .. here is the git
>> https://github.com/senthilec566/netty4-server , is this right
>> implementation ?
>>
>> Cheers,
>> Senthil
>>
>> On Wed, Jun 21, 2017 at 7:45 PM, Tauzell, Dave <
>> dave.tauz...@surescripts.com> wrote:
>> I see.
>>
>> 1.   You don’t want the 100k machines sending directly to kafka.
>>
>> 2.   You can only have a small number of web servers
>>
>> People certainly have web-servers handling over 100k concurrent
>> connections.  See this for some examples:  https://github.com/smallnest/C
>> 1000K-Servers .
>>
>> It seems possible with the right sort of kafka producer tuning.
>>
>> -Dave
>>
>> From: SenthilKumar K [mailto:senthilec...@gmail.com> senthilec...@gmail.com>]
>> Sent: Wednesday, June 21, 2017 8:55 AM
>> To: Tauzell, Dave
>> Cc: users@kafka.apache.org;
>> senthilec...@apache.org;
>> d...@kafka.apache.org; Senthil kumar
>> Subject: Re: Handling 2 to 3 Million Events before Kafka
>>
>> Thanks Jeyhun. Yes http server would be problematic here w.r.t network ,
>> memory ..
>>
>> Hi Dave ,  The problem is not with Kafka , it's all about how do you
>> handle huge data before kafka.  I did a simple test with 5 node Kafka
>> Cluster which gives good result ( ~950 MB/s ) ..So Kafka side i dont see a
>> scaling issue ...
>>
>> All we are trying is before kafka how do we handle messages from
>> different servers ...  Webservers can send fast to kafka but still i can
>> handle only 50k events per second which is less for my use case.. also i
>> can't deploy 20 webservers to handle this load. I'm looking for an option
>> what could be the best candidate before kafka , it should be super fast in
>> getting all and send it to kafka producer ..
>>
>>
>> --Senthil
>>
>> On Wed, Jun 21, 2017 at 6:53 PM, Tauzell, Dave <
>> dave.tauz...@surescripts.com> wrote:
>> What are your configurations?
>>
>> - production
>> - brokers
>> - consumers
>>
>> Is the problem that web servers cannot send to Kafka fast enough or your
>> consumers cannot process messages off of kafka fast enough?
>> What is the average size of these messages?
>>
>> -Dave
>>
>> -Original Message-
>> From: SenthilKumar K [mailto:senthilec...@gmail.com> senthilec...@gmail.com>]
>> Sent: Wednesday, June 21, 2017 7:58 AM
>> To: users@kafka.apache.org
>> Cc: senthilec...@apache.org; Senthil
>> kumar; d...@kafka.apache.org
>> Subject: Handling 2 to 3 Million Events before Kafka
>>
>> Hi Team ,   Sorry if this question is irrelevant to Kafka Group ...
>>
>> I have been trying to solve problem of handling 5 GB/sec ingestion. Kafka
>> is really good candidate for us to handle this ingestion rate ..
>>
>>
>> 100K machines > { Http Server (Jetty/Netty) } --> Kafka Cluster..
>>
>> I see the problem in Http Server where it can't handle beyond 50K events
>> per instance ..  I'm thinking some other solution would be right choice
>> before Kafka ..
>>
>> Anyone worked on similar use case and similar load ? Suggestions/Thoughts
>> ?
>>
>> --Senthil
>> This e-mail and any files transmitted with it are confidential, may
>> contain sensitive information, and are intended solely for the use of the
>> individual or entity to whom they are addressed. If you have received this
>> e-mail in error, please notify the sender by reply e-mail immediately and
>> destroy all copies of the e-mail and any attachments.
>>
>>
>>
>


Kafka 0.11.0 release

2017-06-22 Thread Raghav
Hi

Would anyone know when is the Kafka 0.11.0 scheduled to be released ?

Thanks.

-- 
Raghav


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Jan Filipiak

Hi Eno,

I am less interested in the user facing interface but more in the actual 
implementation. Any hints where I can follow the discussion on this? As 
I still want to discuss upstreaming of KAFKA-3705 with someone


Best Jan


On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian




Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Eno Thereska
Note that while I agree with the initial proposal (withKeySerdes, withJoinType, 
etc), I don't agree with things like .materialize(), .enableCaching(), 
.enableLogging(). 

The former maintain the declarative DSL, while the later break the declarative 
part by mixing system decisions in the DSL.  I think there is a difference 
between the two proposals.

Eno

> On 22 Jun 2017, at 03:46, Guozhang Wang  wrote:
> 
> I have been thinking about reducing all these overloaded functions for
> stateful operations (there are some other places that introduces overloaded
> functions but let's focus on these only in this discussion), what I used to
> have is to use some "materialize" function on the KTables, like:
> 
> ---
> 
> // specifying the topology
> 
> KStream stream1 = builder.stream();
> KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
> sessionMerger, sessionWindows);  // do not allow to pass-in a state store
> supplier here any more
> 
> // additional specs along with the topology above
> 
> table1.materialize("queryableStoreName"); // or..
> table1.materialize("queryableStoreName").enableCaching().enableLogging();
> // or..
> table1.materialize(stateStoreSupplier); // add the metrics / logging /
> caching / windowing functionalities on top of the store, or..
> table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
> etc..
> 
> ---
> 
> But thinking about it more, I feel Damian's first proposal is better since
> my proposal would likely to break the concatenation (e.g. we may not be
> able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
> want to use different specs for the intermediate filtered KTable).
> 
> 
> But since this is a incompatibility change, and we are going to remove the
> compatibility annotations soon it means we only have one chance and we
> really have to make it right. So I'd call out for anyone try to rewrite
> your examples / demo code with the proposed new API and see if it feel
> natural, for example, if I want to use a different storage engine than the
> default rockDB engine how could I easily specify that with the proposed
> APIs?
> 
> Meanwhile Damian could you provide a formal set of APIs for people to
> exercise on them? Also could you briefly describe how custom storage
> engines could be swapped in with the above APIs?
> 
> 
> 
> Guozhang
> 
> 
> On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska 
> wrote:
> 
>> To make it clear, it’s outlined by Damian, I just copy pasted what he told
>> me in person :)
>> 
>> Eno
>> 
>>> On Jun 21, 2017, at 4:40 PM, Bill Bejeck  wrote:
>>> 
>>> +1 for the approach outlined above by Eno.
>>> 
>>> On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy 
>> wrote:
>>> 
 Thanks Eno.
 
 Yes i agree. We could apply this same approach to most of the operations
 where we have multiple overloads, i.e., we have a single method for each
 operation that takes the required parameters and everything else is
 specified as you have done above.
 
 On Wed, 21 Jun 2017 at 16:24 Eno Thereska 
>> wrote:
 
> (cc’ing user-list too)
> 
> Given that we already have StateStoreSuppliers that are configurable
 using
> the fluent-like API, probably it’s worth discussing the other examples
 with
> joins and serdes first since those have many overloads and are in need
>> of
> some TLC.
> 
> So following your example, I guess you’d have something like:
> .join()
>  .withKeySerdes(…)
>  .withValueSerdes(…)
>  .withJoinType(“outer”)
> 
> etc?
> 
> I like the approach since it still remains declarative and it’d reduce
 the
> number of overloads by quite a bit.
> 
> Eno
> 
>> On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:
>> 
>> Hi,
>> 
>> I'd like to get a discussion going around some of the API choices
>> we've
>> made in the DLS. In particular those that relate to stateful
>> operations
>> (though this could expand).
>> As it stands we lean heavily on overloaded methods in the API, i.e,
 there
>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and
 i
>> feel it is only going to get worse as we add more optional params. In
>> particular we've had some requests to be able to turn caching off, or
>> change log configs,  on a per operator basis (note this can be done
>> now
> if
>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
>> 
>> So this is a bit of an open question. How can we change the DSL
 overloads
>> so that it flows, is simple to use and understand, and is easily
 extended
>> in the future?
>> 
>> One option would be to use a fluent API approach for providing the
> optional
>> params, so something like this:
>> 
>> groupedStream.count()
>> .withStoreName("name")
>>

Re: ticketing system Design

2017-06-22 Thread Sameer Kumar
Hi Abhimanya,

You can very well do it through Kafka, KafkaStreams and something like
redis.

I would design it to be something like this:-

1. Topic 1 - Pending tasks
2. Topic 2 - Reassigned Tasks.
3. Topic 3- Task To Resource Mapping.

Some other components could be:-

4. Redis Hash(task progress info)
We can also need to store a mapping of resources with completed task count
and in progress task count. This shall help us in ensuring that pipeline
remains active and bandwidth is managed between people who are free vs
busy(managed through counts).

5. TaskAssigner - KStreams - listens to the Pending Tasks and Reassigned
queue. sees who can be assigned task through reading redis hash.


6. Task Processor - java/scala program - processes the task.Probably
trigged after resource processes the task.

7. Task rebalancer - java/scala program - triggered at timely intervals to
do the reassignments.


-Sameer.

On Wed, Jun 21, 2017 at 7:55 AM, Tarun Garg  wrote:

> need some more input on this.
>
>
> Kafka is a queue it doesn't take any action.
>
>
> sender(producer) sends data to kafka and consumer pulls data from kafka
> queue. so there is no assignment of data to any consumer.
>
> if a process/human cann't take any action then kafka cann't help in this
> case.
>
> hope it answers.
>
> 
> From: Abhimanyu Nagrath 
> Sent: Monday, June 19, 2017 8:01 PM
> To: users@kafka.apache.org
> Subject: Re: ticketing system Design
>
> Hi ,
>
> Can anyone suggest me where I can get the answer for these type of
> questions?
>
>
> Regards,
> Abhimanyu
>
> On Thu, Jun 8, 2017 at 6:49 PM, Abhimanyu Nagrath <
> abhimanyunagr...@gmail.com> wrote:
>
> > Hi ,
> >
> > Is Apache Kafka along with storm can be used to design a ticketing
> system.
> > By ticketing system, I mean that there are millions of tasks stored in
> > Kafka queues and there are processes/humans to take some actions on the
> > task. there are come constraints that same task should not be assigned to
> > two processes/humans and if a task flows to a process/human and no action
> > is performed it should be reassigned.
> >  I am not sure whether this can be solved using Kafka.Any help is
> > appreciated
> >
> >
> >
> > Regards,
> > Abhimanyu
> >
>


Re: Max message size and compression

2017-06-22 Thread Eli Jordan
Thanks for the reply Mayank. Do you know if this is documented somewhere? I 
wasnt able to find mention of it.

Thanks
Eli

> On 22 Jun 2017, at 05:50, mayank rathi  wrote:
> 
> If you are compressing messages than size of "compressed" message should be
> less than what's specified in these parameters.
> 
> On Sat, Jun 17, 2017 at 7:46 PM, Eli Jordan 
> wrote:
> 
>> Hi
>> 
>> max.message.bytes controls the maximum message size the kafka server will
>> process
>> 
>> message.max.bytes controls the maximum message size the consumer will
>> process
>> 
>> max.request.size controls the maximum request size for the producer
>> 
>> Whats not clear to me (and I can't find documented anywhere) is if the
>> message size limits are imposed on compressed or uncompressed messages,
>> when compression is enabled.
>> 
>> Note: I'm use Kafka 0.10.2.0 if that makes any difference.
>> 
>> Any pointers or advice on this would be greatly appreciated.
>> 
>> Thanks
>> Eli
> 
> 
> 
> 
> -- 
> NOTICE: This email message is for the sole use of the intended recipient(s)
> and may contain confidential and privileged information. Any unauthorized
> review, use, disclosure or distribution is prohibited. If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.