Kafka streams - regarding WordCountInteractiveQueriesExample

2017-11-29 Thread Giridhar Addepalli
Hi,

I am newbie to Kafka streams.

Tried below example :

https://github.com/confluentinc/kafka-streams-examples/blob/4.0.x/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java

http://localhost:7070/state/instances

[
{
"host": "localhost",
"port": 7070,
"storeNames": [
"windowed-word-count",
"word-count"
]
},
{
"host": "localhost",
"port": 7071,
"storeNames": [
"windowed-word-count",
"word-count"
]
}
]



I was able to query for count of a given word :

http://localhost:7070/state/keyvalue/word-count/hello

{
"key": "hello",
"value": 444
}



http://localhost:7071/state/keyvalue/word-count/world

{
"key": "world",
"value": 906
}


But i am not able to replicate following part(highlighted) of advertised
behavior in the example :

* 5) Use your browser to hit the REST endpoint of the app instance you
started in step 3 to query
* the state managed by this application.  Note: If you are running
multiple app instances, you can
* query them arbitrarily -- if an app instance cannot satisfy a query
itself, it will fetch the
* results from the other instances.


For example , following gave 404 error
http://localhost:7070/state/keyvalue/word-count/world

HTTP ERROR: 404

Problem accessing /state/keyvalue/word-count/world. Reason:

Not Found

Please let me know where my expectations are going wrong.
Please note that i have tried both 3.3.0-post & 4.0.0-post branches.

Thanks,
Giridhar.


Re: KAFKA-5413 and 0.10.x

2017-11-29 Thread Ismael Juma
Just a quick note.

On Tue, Nov 28, 2017 at 8:09 PM, Philippe Laflamme 
wrote:

> Upgrading our cluster to a 0.11 version is certainly an option, but is a
> risky one given that it could introduce move bugs (especially since it has
> a new storage format and a whole slew of new features). We would much
> rather upgrade to a 0.10.x release to only obtain this fix that we need.
>

A few points:

1. 0.11.0.2 also fixes a large number of bugs (including some critical
ones) so I would say that there is also risk in sticking with 0.10.2.x.
2. You can continue to use the old message format even if you upgrade to
0.11.0.x.
3. 0.11.0.x has had two bug fix releases so it's reasonably safe to upgrade
to by now. As usual, you'd want to do your own testing though.

Ismael


Kafka beginner: problem with interpreting logs, troubleshooting running in docker swarm

2017-11-29 Thread Ron Arts
Hi,

I am running one kafka broker, against a 3 node zookeeper cluster, all in
AWS.
Using Kafka Tool on it, which works flawlessly against my local Kafka (in
docker)

Kafka Tool can connect, and comes up with information, but I can't create
topics,
even though it reports success.

On the other hand creating topics from the Kafka container seems to work,
listing
them does work.

In the docker container I do get Java java.io.EOFException, which I have
trouble understanding.
I upped the Kafka loglevel to DEBU, it's attached, hope it's not too long.

I hope someone can take a quick glance at it, and point me in the right
direction.

Thanks,
Ron Arts



- Kafka log here --
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]:  New Monit id:
8bf4a30eab07533f3c2a0e86c003ec09
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]:  Stored in '/opt/kafka/.
monit.id'
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]: Starting Monit 5.24.0
daemon with http interface at [*]:2812
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]: '5623226517f6' Monit
5.24.0 started
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]: 'kafka-service' process
is not running
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]: 'kafka-service' trying
to restart
Nov 29 11:52:22 ec2-IP.AWS.X.X 5623226517f6[2643]: 'kafka-service' start:
'/opt/kafka/bin/kafka-service.sh start'
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: [2017-11-29
11:52:23,991] INFO KafkaConfig values:
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011advertised.host.name
= null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011advertised.listeners
= PLAINTEXT://IP.AWS.X.X:9092
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011advertised.port =
null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011alter.config.policy.class.name = null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011authorizer.class.name
=
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011auto.create.topics.enable = true
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011auto.leader.rebalance.enable = true
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011background.threads =
10
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011broker.id = 0
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011broker.id.generation.enable = true
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011broker.rack = null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011compression.type =
producer
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011connections.max.idle.ms = 60
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011controlled.shutdown.enable = true
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011controlled.shutdown.max.retries = 3
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011controlled.shutdown.retry.backoff.ms = 5000
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011controller.socket.timeout.ms = 3
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011create.topic.policy.class.name = null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011default.replication.factor = 1
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011delete.records.purgatory.purge.interval.requests = 1
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011delete.topic.enable
= false
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011fetch.purgatory.purge.interval.requests = 1000
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011group.initial.rebalance.delay.ms = 3000
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011group.max.session.timeout.ms = 30
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011group.min.session.timeout.ms = 6000
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011host.name =
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011inter.broker.listener.name = null
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011inter.broker.protocol.version = 1.0-IV0
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011leader.imbalance.check.interval.seconds = 300
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011leader.imbalance.per.broker.percentage = 10
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011listener.security.protocol.map =
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011listeners =
PLAINTEXT://0.0.0.0:9092
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011log.cleaner.backoff.ms = 15000
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011log.cleaner.dedupe.buffer.size = 134217728
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #
011log.cleaner.delete.retention.ms = 8640
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]: #011log.cleaner.enable =
true
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011log.cleaner.io.buffer.load.factor = 0.9
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]:
#011log.cleaner.io.buffer.size = 524288
Nov 29 11:52:23 ec2-IP.AWS.X.X 5623226517f6[2643]

Re: Kafka streams - regarding WordCountInteractiveQueriesExample

2017-11-29 Thread Bill Bejeck
Giridhar,

Thanks for reporting this, I'll take a look.

On Wed, Nov 29, 2017 at 5:37 AM, Giridhar Addepalli 
wrote:

> Hi,
>
> I am newbie to Kafka streams.
>
> Tried below example :
>
> https://github.com/confluentinc/kafka-streams-
> examples/blob/4.0.x/src/main/java/io/confluent/examples/
> streams/interactivequeries/WordCountInteractiveQueriesExample.java
>
> http://localhost:7070/state/instances
>
> [
> {
> "host": "localhost",
> "port": 7070,
> "storeNames": [
> "windowed-word-count",
> "word-count"
> ]
> },
> {
> "host": "localhost",
> "port": 7071,
> "storeNames": [
> "windowed-word-count",
> "word-count"
> ]
> }
> ]
>
>
>
> I was able to query for count of a given word :
>
> http://localhost:7070/state/keyvalue/word-count/hello
>
> {
> "key": "hello",
> "value": 444
> }
>
>
>
> http://localhost:7071/state/keyvalue/word-count/world
>
> {
> "key": "world",
> "value": 906
> }
>
>
> But i am not able to replicate following part(highlighted) of advertised
> behavior in the example :
>
> * 5) Use your browser to hit the REST endpoint of the app instance you
> started in step 3 to query
> * the state managed by this application.  Note: If you are running
> multiple app instances, you can
> * query them arbitrarily -- if an app instance cannot satisfy a query
> itself, it will fetch the
> * results from the other instances.
>
>
> For example , following gave 404 error
> http://localhost:7070/state/keyvalue/word-count/world
>
> HTTP ERROR: 404
>
> Problem accessing /state/keyvalue/word-count/world. Reason:
>
> Not Found
>
> Please let me know where my expectations are going wrong.
> Please note that i have tried both 3.3.0-post & 4.0.0-post branches.
>
> Thanks,
> Giridhar.
>


Plans to extend streams?

2017-11-29 Thread Adrienne Kole
Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne


Re: Plans to extend streams?

2017-11-29 Thread Wim Van Leuven
What you are actually asking is if Kafka Streams should be reimplemented as
Apache Storm?
-wim

On Wed, 29 Nov 2017 at 15:10 Adrienne Kole  wrote:

> Hi,
>
> The purpose of this email is to get overall intuition for the future  plans
> of streams library.
>
> The main question is that, will it be a single threaded application in the
> long run and serve microservices use-cases, or are there any plans to
> extend it to multi-node execution framework with less kafka dependency.
>
> Currently, each streams node 'talks' with kafka cluster and they can
> indirectly talk with each other again through kafka. However, especially if
> kafka is not in the same network with streams nodes (actually this can
> happen if they are in the same network as well) this will cause high
> network overhead and inefficiency.
>
> One solution for this (bypassing network overhead) is to deploy streams
> node on kafka cluster to ensure the data locality. However, this is not
> recommended as the library and kafka can affect each other's performance
> and  streams does not necessarily have to know the internal data
> partitioning of kafka.
>
> Another solution would be extending streams library to have a common
> runtime. IMO, preserving the current selling points of streams (like
> dynamic scale in/out) with this kind of extensions can be very good
> improvement.
>
> So my question is that, will streams in the long/short run, will extend its
> use-cases to massive and efficient stream processing (and compete with
> spark) or stay and strengthen its current position?
>
> Cheers,
> Adrienne
>


Re: Plans to extend streams?

2017-11-29 Thread Adrienne Kole
Not necessarily.

I would avoid the term "reimplemented". Btw, Apache Storm is not also the
best (streaming) system that can utilize the network and it does not
support runtime scale in/out (at least by design).
So, can streams preserve its current selling points (ex:dynamicity) while
introducing a new communication protocol, which will include the current
one (streams-kafka-streams) and the new one (streams-streams)? Are there
plans over this?



On Wed, Nov 29, 2017 at 3:56 PM, Wim Van Leuven <
wim.vanleu...@highestpoint.biz> wrote:

> What you are actually asking is if Kafka Streams should be reimplemented as
> Apache Storm?
> -wim
>
> On Wed, 29 Nov 2017 at 15:10 Adrienne Kole 
> wrote:
>
> > Hi,
> >
> > The purpose of this email is to get overall intuition for the future
> plans
> > of streams library.
> >
> > The main question is that, will it be a single threaded application in
> the
> > long run and serve microservices use-cases, or are there any plans to
> > extend it to multi-node execution framework with less kafka dependency.
> >
> > Currently, each streams node 'talks' with kafka cluster and they can
> > indirectly talk with each other again through kafka. However, especially
> if
> > kafka is not in the same network with streams nodes (actually this can
> > happen if they are in the same network as well) this will cause high
> > network overhead and inefficiency.
> >
> > One solution for this (bypassing network overhead) is to deploy streams
> > node on kafka cluster to ensure the data locality. However, this is not
> > recommended as the library and kafka can affect each other's performance
> > and  streams does not necessarily have to know the internal data
> > partitioning of kafka.
> >
> > Another solution would be extending streams library to have a common
> > runtime. IMO, preserving the current selling points of streams (like
> > dynamic scale in/out) with this kind of extensions can be very good
> > improvement.
> >
> > So my question is that, will streams in the long/short run, will extend
> its
> > use-cases to massive and efficient stream processing (and compete with
> > spark) or stay and strengthen its current position?
> >
> > Cheers,
> > Adrienne
> >
>


Re: Plans to extend streams?

2017-11-29 Thread Jan Filipiak

Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan


On 29.11.2017 15:10, Adrienne Kole wrote:

Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne





Kafka & Canary Release

2017-11-29 Thread Yuval Alon
Hello,
We are using Apache Kafka 0.10.2, which we understand is not supporting “Canary 
Release”
Is it in your pipeline, if so, when it’s going to be released?
Explanation of the issue:
We hold two different versions of micro services that subscribes to the same 
topic. We would like to get only message that match some condition to each of 
them, for example:
1.  Message with user=foo to micro service instance of version 1
2.  Message with user<>foo to micro service instance of version 2

The problem is how to route to right instances?


Yuval Alon
amdocs technology – Digital
Mobile: +972-52-6148937
Office:   +972-9-7789641
email:   yuva...@amdocs.com

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer 



Re: Plans to extend streams?

2017-11-29 Thread Adrienne Kole
Hi,

You misunderstood the focus of the post perhaps or I could not explain
properly. I am not claiming the streams is limited to single node.
Although the whole topology instance can be limited to a single node (each
node run all topology), this is sth else.
Also, I think that "moving 100s of GB data per day" claim is orthogonal
and as this is not big/fast/ enough to reason.

The thing is that, for some use-cases streams-kafka-streams connection can
be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
this might not be an issue.

Consider a simple topology with operators A>B->C. (B forces to re-partition)
 Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
might be in different network switch.
So, rather than transferring data k->s1->s2, we make a round trip
k->s1->k->s2. If we know that s1 and s2 are in the same network and data
transfer is fast between two, we should not go through another intermediate
layer.


Thanks.



On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
wrote:

> Hey,
>
> you making some wrong assumptions here.
> Kafka Streams is in no way single threaded or
> limited to one physical instance.
> Having connectivity issues to your brokers is IMO
> a problem with the deployment and not at all
> with how kafka streams is designed and works.
>
> Kafka Streams moves hundreds of GB per day for us.
>
> Hope this helps.
>
> Best Jan
>
>
>
> On 29.11.2017 15:10, Adrienne Kole wrote:
>
>> Hi,
>>
>> The purpose of this email is to get overall intuition for the future
>> plans
>> of streams library.
>>
>> The main question is that, will it be a single threaded application in the
>> long run and serve microservices use-cases, or are there any plans to
>> extend it to multi-node execution framework with less kafka dependency.
>>
>> Currently, each streams node 'talks' with kafka cluster and they can
>> indirectly talk with each other again through kafka. However, especially
>> if
>> kafka is not in the same network with streams nodes (actually this can
>> happen if they are in the same network as well) this will cause high
>> network overhead and inefficiency.
>>
>> One solution for this (bypassing network overhead) is to deploy streams
>> node on kafka cluster to ensure the data locality. However, this is not
>> recommended as the library and kafka can affect each other's performance
>> and  streams does not necessarily have to know the internal data
>> partitioning of kafka.
>>
>> Another solution would be extending streams library to have a common
>> runtime. IMO, preserving the current selling points of streams (like
>> dynamic scale in/out) with this kind of extensions can be very good
>> improvement.
>>
>> So my question is that, will streams in the long/short run, will extend
>> its
>> use-cases to massive and efficient stream processing (and compete with
>> spark) or stay and strengthen its current position?
>>
>> Cheers,
>> Adrienne
>>
>>
>


Kafka Topic Exists Exception - On upgrading to 1.0

2017-11-29 Thread Debraj Manna
Hi

I am trying to upgrade a single node kafka broker to latest 1.0 from 0.10.

The steps followed


   1. Stopped Kafka Broker
   2. Replaced bin/, libs & site-docs/ with the latest
   3. Started Kafka

But I am seeing the below exception in logs - Anyone any thoughts? How can
I get around this?

017-11-29 16:21:36.591 [main-EventThread] ClientCnxn [INFO] EventThread
shut down
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/kafka/common/errors/TopicExistsException
at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$createStream$2.apply(KafkaSystemAdmin.scala:442)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$createStream$2.apply(KafkaSystemAdmin.scala:440)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.createStream(KafkaSystemAdmin.scala:422)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.createCoordinatorStream(KafkaSystemAdmin.scala:337)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:88)
at org.apache.samza.job.JobRunner$.doOperation(JobRunner.scala:52)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:47)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.errors.TopicExistsException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 9 more


Re: Kafka & Canary Release

2017-11-29 Thread Wim Van Leuven
Isn't that simply a matter of changing your key to route the element to the
right topic for 1 or the other microservice?
-w

On Wed, 29 Nov 2017 at 17:09 Yuval Alon  wrote:

> Hello,
> We are using Apache Kafka 0.10.2, which we understand is not supporting
> “Canary Release”
> Is it in your pipeline, if so, when it’s going to be released?
> Explanation of the issue:
> We hold two different versions of micro services that subscribes to the
> same topic. We would like to get only message that match some condition to
> each of them, for example:
> 1.  Message with user=foo to micro service instance of version 1
> 2.  Message with user<>foo to micro service instance of version 2
>
> The problem is how to route to right instances?
>
>
> Yuval Alon
> amdocs technology – Digital
> Mobile: +972-52-6148937 <+972%2052-614-8937>
> Office:   +972-9-7789641 <+972%209-778-9641>
> email:   yuva...@amdocs.com
>
> This message and the information contained herein is proprietary and
> confidential and subject to the Amdocs policy statement,
>
> you may review at https://www.amdocs.com/about/email-disclaimer <
> https://www.amdocs.com/about/email-disclaimer>
>


Too many open files in kafka 0.9

2017-11-29 Thread REYMOND Jean-max (BPCE-IT - SYNCHRONE TECHNOLOGIES)
We have a cluster with 3 brokers and kafka 0.9.0.1. One week ago, we decide to 
adjust log.retention.hours from 10 days to 2 days. Stop and go the cluster and 
it is ok. But for one broker, we have every day more and more datas and two 
days later crash with message too many open files. lsof return 7400 opened 
files. We adjust to 1 and crash again. So, in our data repository, we 
remove all the datas and run again and after a few minutes, cluster is OK. But 
now, after atfer 6 hours, the two valid brokers have 72 GB and the other broker 
has 90 GB. lsof -p xxx returns 1030 and it is growing continously. I am sure 
that tomorrow morning, we will have a crash.

In the server.log of the broken broker,

[2017-11-29 17:28:51,360] INFO Rolled new log segment for 
'__consumer_offsets-27' in 1 ms. (kafka.log.Log)
[2017-11-29 17:31:28,836] INFO Rolled new log segment for 
'__consumer_offsets-8' in 1 ms. (kafka.log.Log)
[2017-11-29 17:35:22,100] INFO Rolled new log segment for 
'__consumer_offsets-12' in 1 ms. (kafka.log.Log)
[2017-11-29 17:37:55,984] INFO Rolled new log segment for 
'__consumer_offsets-11' in 1 ms. (kafka.log.Log)
[2017-11-29 17:38:30,600] INFO [Group Metadata Manager on Broker 2]: Removed 0 
expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-11-29 17:39:55,836] INFO Rolled new log segment for 
'__consumer_offsets-16' in 1 ms. (kafka.log.Log)
[2017-11-29 17:43:38,300] INFO Rolled new log segment for 
'__consumer_offsets-48' in 1 ms. (kafka.log.Log)
[2017-11-29 17:44:21,110] INFO Rolled new log segment for 
'__consumer_offsets-36' in 1 ms. (kafka.log.Log)
[2017-11-29 17:48:30,600] INFO [Group Metadata Manager on Broker 2]: Removed 0 
expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

And in the same time on a valid broker

[2017-11-29 17:44:46,704] INFO Deleting index 
/pfic/kafka/data/kafka_data/__consumer_offsets-48/002686063378.index.deleted
 (kafka.log.OffsetIndex)
[2017-11-29 17:44:47,341] INFO Deleting segment 2687254936 from log 
__consumer_offsets-48. (kafka.log.Log)
[2017-11-29 17:44:47,376] INFO Deleting index 
/pfic/kafka/data/kafka_data/__consumer_offsets-48/002687254936.index.deleted
 (kafka.log.OffsetIndex)
[2017-11-29 17:45:32,991] INFO Deleting segment 0 from log 
__consumer_offsets-36. (kafka.log.Log)
[2017-11-29 17:45:32,991] INFO Deleting segment 1769617973 from log 
__consumer_offsets-36. (kafka.log.Log)
[2017-11-29 17:45:32,993] INFO Deleting index 
/pfic/kafka/data/kafka_data/__consumer_offsets-36/.index.deleted
 (kafka.log.OffsetIndex)
[2017-11-29 17:45:32,993] INFO Deleting index 
/pfic/kafka/data/kafka_data/__consumer_offsets-36/001769617973.index.deleted
 (kafka.log.OffsetIndex)
[2017-11-29 17:45:33,593] INFO Deleting segment 1770704579 from log 
__consumer_offsets-36. (kafka.log.Log)
[2017-11-29 17:45:33,627] INFO Deleting index 
/pfic/kafka/data/kafka_data/__consumer_offsets-36/001770704579.index.deleted
 (kafka.log.OffsetIndex)
[2017-11-29 17:45:58,394] INFO [Group Metadata Manager on Broker 0]: Removed 0 
expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

So, the broken broker never delete a segment. Of course, the three brokers have 
the same configuration.
What's happen ?
Thanks for your advices,


Jean-Max REYMOND
BPCE Infogérance & Technologies

--
L'intégrité de ce message n'étant pas assurée sur Internet, BPCE-IT ne peut 
être tenu responsable de son contenu. Si vous n'êtes pas destinataire de ce 
message, merci de le détruire et d'avertir l'expéditeur.
The integrity of this message cannot be guaranteed on the Internet. BPCE-IT 
cannot therefore be considered responsible for the contents. If you are not the 
intended recipient of this message, then please delete it and notify the sender.
--


RE: Kafka & Canary Release

2017-11-29 Thread Assaf Katz
Hi,
It doesn’t so simple only micro service 1 knows about it - if micro service 2 
send to micro service 1, it doesn’t have any idea that micro service 1 was 
updated. So we need to route it on message hub - Kafka.
Thanks,
Assaf

From: Wim Van Leuven [mailto:wim.vanleu...@highestpoint.biz]
Sent: Wednesday, 29 November 2017 18:47
To: users@kafka.apache.org
Cc: Assaf Katz ; Zeevik Liak 
Subject: Re: Kafka & Canary Release

Isn't that simply a matter of changing your key to route the element to the 
right topic for 1 or the other microservice?
-w

On Wed, 29 Nov 2017 at 17:09 Yuval Alon 
mailto:yuva...@amdocs.com>> wrote:
Hello,
We are using Apache Kafka 0.10.2, which we understand is not supporting “Canary 
Release”
Is it in your pipeline, if so, when it’s going to be released?
Explanation of the issue:
We hold two different versions of micro services that subscribes to the same 
topic. We would like to get only message that match some condition to each of 
them, for example:
1.  Message with user=foo to micro service instance of version 1
2.  Message with user<>foo to micro service instance of version 2

The problem is how to route to right instances?


Yuval Alon
amdocs technology – Digital
Mobile: +972-52-6148937
Office:   +972-9-7789641
email:   
yuva...@amdocs.com>

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer 

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,

you may review at https://www.amdocs.com/about/email-disclaimer 



Kafka 1.0.0 - Creating a topic and waiting for it to be ready

2017-11-29 Thread Cosmin Marginean
Hello everyone.

We are currently using AdminUtils.createTopic to create topics at runtime
and we were wondering if it's possible to implement some check to validate
that the topic is "ready" for producer/consumer wiring.

Thank you.
Cos.


Re: Too many open files in kafka 0.9

2017-11-29 Thread Ted Yu
There is KAFKA-3317 which is still open.

Have you seen this ?

http://search-hadoop.com/m/Kafka/uyzND1KvOlt1p5UcE?subj=Re+Brokers+is+down+by+java+io+IOException+Too+many+open+files+

On Wed, Nov 29, 2017 at 8:55 AM, REYMOND Jean-max (BPCE-IT - SYNCHRONE
TECHNOLOGIES)  wrote:

> We have a cluster with 3 brokers and kafka 0.9.0.1. One week ago, we
> decide to adjust log.retention.hours from 10 days to 2 days. Stop and go
> the cluster and it is ok. But for one broker, we have every day more and
> more datas and two days later crash with message too many open files. lsof
> return 7400 opened files. We adjust to 1 and crash again. So, in our
> data repository, we remove all the datas and run again and after a few
> minutes, cluster is OK. But now, after atfer 6 hours, the two valid brokers
> have 72 GB and the other broker has 90 GB. lsof -p xxx returns 1030 and it
> is growing continously. I am sure that tomorrow morning, we will have a
> crash.
>
> In the server.log of the broken broker,
>
> [2017-11-29 17:28:51,360] INFO Rolled new log segment for
> '__consumer_offsets-27' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:31:28,836] INFO Rolled new log segment for
> '__consumer_offsets-8' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:35:22,100] INFO Rolled new log segment for
> '__consumer_offsets-12' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:37:55,984] INFO Rolled new log segment for
> '__consumer_offsets-11' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:38:30,600] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2017-11-29 17:39:55,836] INFO Rolled new log segment for
> '__consumer_offsets-16' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:43:38,300] INFO Rolled new log segment for
> '__consumer_offsets-48' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:44:21,110] INFO Rolled new log segment for
> '__consumer_offsets-36' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:48:30,600] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
> And in the same time on a valid broker
>
> [2017-11-29 17:44:46,704] INFO Deleting index
> /pfic/kafka/data/kafka_data/__consumer_offsets-48/
> 002686063378.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:44:47,341] INFO Deleting segment 2687254936 from log
> __consumer_offsets-48. (kafka.log.Log)
> [2017-11-29 17:44:47,376] INFO Deleting index
> /pfic/kafka/data/kafka_data/__consumer_offsets-48/
> 002687254936.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:32,991] INFO Deleting segment 0 from log
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:32,991] INFO Deleting segment 1769617973 from log
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:32,993] INFO Deleting index
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> .index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:32,993] INFO Deleting index
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> 001769617973.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:33,593] INFO Deleting segment 1770704579 from log
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:33,627] INFO Deleting index
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> 001770704579.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:58,394] INFO [Group Metadata Manager on Broker 0]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
> So, the broken broker never delete a segment. Of course, the three brokers
> have the same configuration.
> What's happen ?
> Thanks for your advices,
>
>
> Jean-Max REYMOND
> BPCE Infogérance & Technologies
>
> 
> --
> L'intégrité de ce message n'étant pas assurée sur Internet, BPCE-IT ne
> peut être tenu responsable de son contenu. Si vous n'êtes pas destinataire
> de ce message, merci de le détruire et d'avertir l'expéditeur.
> The integrity of this message cannot be guaranteed on the Internet.
> BPCE-IT cannot therefore be considered responsible for the contents. If you
> are not the intended recipient of this message, then please delete it and
> notify the sender.
> 
> --
>


My kafka docker image

2017-11-29 Thread Christian F. Gonzalez Di Antonio
Hi everyone, I would like to share my recently kafka's docker image with
you.

I made this docker image because I try to get the major focus of kafka
configuration in my development environment and also because any of the
others images available was not easy to understand (how to use) outside
kafka itself configuration.

it was not tested on Kubernates, but I expect to do that soon.


 feel free to let me know your feedback on the github's repository

Regards,
Christian


Re: Plans to extend streams?

2017-11-29 Thread Matthias J. Sax
We had some discussion if we can/should replace re-partitioning topic
via a direct network connection between instances. It's a tricky problem
though with many string attached... Thus, it comes with pros and cons
and it's still unclear what the exact trade-off is.

Thus, it might happen, but it's unclear atm if or when. No concrete road
map. But as an open-source project, we rely on user feedback. Thus, this
idea just got one more +1 :)


-Matthias

On 11/29/17 8:26 AM, Adrienne Kole wrote:
> Hi,
> 
> You misunderstood the focus of the post perhaps or I could not explain
> properly. I am not claiming the streams is limited to single node.
> Although the whole topology instance can be limited to a single node (each
> node run all topology), this is sth else.
> Also, I think that "moving 100s of GB data per day" claim is orthogonal
> and as this is not big/fast/ enough to reason.
> 
> The thing is that, for some use-cases streams-kafka-streams connection can
> be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
> this might not be an issue.
> 
> Consider a simple topology with operators A>B->C. (B forces to re-partition)
>  Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
> might be in different network switch.
> So, rather than transferring data k->s1->s2, we make a round trip
> k->s1->k->s2. If we know that s1 and s2 are in the same network and data
> transfer is fast between two, we should not go through another intermediate
> layer.
> 
> 
> Thanks.
> 
> 
> 
> On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
> wrote:
> 
>> Hey,
>>
>> you making some wrong assumptions here.
>> Kafka Streams is in no way single threaded or
>> limited to one physical instance.
>> Having connectivity issues to your brokers is IMO
>> a problem with the deployment and not at all
>> with how kafka streams is designed and works.
>>
>> Kafka Streams moves hundreds of GB per day for us.
>>
>> Hope this helps.
>>
>> Best Jan
>>
>>
>>
>> On 29.11.2017 15:10, Adrienne Kole wrote:
>>
>>> Hi,
>>>
>>> The purpose of this email is to get overall intuition for the future
>>> plans
>>> of streams library.
>>>
>>> The main question is that, will it be a single threaded application in the
>>> long run and serve microservices use-cases, or are there any plans to
>>> extend it to multi-node execution framework with less kafka dependency.
>>>
>>> Currently, each streams node 'talks' with kafka cluster and they can
>>> indirectly talk with each other again through kafka. However, especially
>>> if
>>> kafka is not in the same network with streams nodes (actually this can
>>> happen if they are in the same network as well) this will cause high
>>> network overhead and inefficiency.
>>>
>>> One solution for this (bypassing network overhead) is to deploy streams
>>> node on kafka cluster to ensure the data locality. However, this is not
>>> recommended as the library and kafka can affect each other's performance
>>> and  streams does not necessarily have to know the internal data
>>> partitioning of kafka.
>>>
>>> Another solution would be extending streams library to have a common
>>> runtime. IMO, preserving the current selling points of streams (like
>>> dynamic scale in/out) with this kind of extensions can be very good
>>> improvement.
>>>
>>> So my question is that, will streams in the long/short run, will extend
>>> its
>>> use-cases to massive and efficient stream processing (and compete with
>>> spark) or stay and strengthen its current position?
>>>
>>> Cheers,
>>> Adrienne
>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Plans to extend streams?

2017-11-29 Thread Guozhang Wang
Hello Adrienne,

I think your suggested feature to to use not only Kafka as inter-process
communication but also configurable to use TCP directly, right?

There are a few people asking about this before, especially for not using
Kafka for repartitioning (think: shuffling in the batch world), but let
them go through TCP between processes. Though this is doable, I'd point out
that it may have many side-effects such as:

1) back pressure: Streams library do not worry about back pressure at all
since all communication channels are persistent (Kafka topics), using TCP
then you need to face the back pressure issue again.
2) exactly once semantics: the transactional messaging is leveraged by
Streams to achieve EOS, and extending TCP means that we need to add more
gears to handle TCP data loss / duplicates (e.g. other frameworks have been
using buffers with epoch boundaries to do that).
3) state snapshots: imagine if you are shutting down your app, we then need
to make sure all in-flight messages with TCP are drained because otherwise
we are not certain if the committed offsets are valid or not.



Guozhang


On Wed, Nov 29, 2017 at 8:26 AM, Adrienne Kole 
wrote:

> Hi,
>
> You misunderstood the focus of the post perhaps or I could not explain
> properly. I am not claiming the streams is limited to single node.
> Although the whole topology instance can be limited to a single node (each
> node run all topology), this is sth else.
> Also, I think that "moving 100s of GB data per day" claim is orthogonal
> and as this is not big/fast/ enough to reason.
>
> The thing is that, for some use-cases streams-kafka-streams connection can
> be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
> this might not be an issue.
>
> Consider a simple topology with operators A>B->C. (B forces to
> re-partition)
>  Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
> might be in different network switch.
> So, rather than transferring data k->s1->s2, we make a round trip
> k->s1->k->s2. If we know that s1 and s2 are in the same network and data
> transfer is fast between two, we should not go through another intermediate
> layer.
>
>
> Thanks.
>
>
>
> On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
> wrote:
>
> > Hey,
> >
> > you making some wrong assumptions here.
> > Kafka Streams is in no way single threaded or
> > limited to one physical instance.
> > Having connectivity issues to your brokers is IMO
> > a problem with the deployment and not at all
> > with how kafka streams is designed and works.
> >
> > Kafka Streams moves hundreds of GB per day for us.
> >
> > Hope this helps.
> >
> > Best Jan
> >
> >
> >
> > On 29.11.2017 15:10, Adrienne Kole wrote:
> >
> >> Hi,
> >>
> >> The purpose of this email is to get overall intuition for the future
> >> plans
> >> of streams library.
> >>
> >> The main question is that, will it be a single threaded application in
> the
> >> long run and serve microservices use-cases, or are there any plans to
> >> extend it to multi-node execution framework with less kafka dependency.
> >>
> >> Currently, each streams node 'talks' with kafka cluster and they can
> >> indirectly talk with each other again through kafka. However, especially
> >> if
> >> kafka is not in the same network with streams nodes (actually this can
> >> happen if they are in the same network as well) this will cause high
> >> network overhead and inefficiency.
> >>
> >> One solution for this (bypassing network overhead) is to deploy streams
> >> node on kafka cluster to ensure the data locality. However, this is not
> >> recommended as the library and kafka can affect each other's performance
> >> and  streams does not necessarily have to know the internal data
> >> partitioning of kafka.
> >>
> >> Another solution would be extending streams library to have a common
> >> runtime. IMO, preserving the current selling points of streams (like
> >> dynamic scale in/out) with this kind of extensions can be very good
> >> improvement.
> >>
> >> So my question is that, will streams in the long/short run, will extend
> >> its
> >> use-cases to massive and efficient stream processing (and compete with
> >> spark) or stay and strengthen its current position?
> >>
> >> Cheers,
> >> Adrienne
> >>
> >>
> >
>



-- 
-- Guozhang


Re: My kafka docker image

2017-11-29 Thread Christian F. Gonzalez Di Antonio
El mié., 29 nov. 2017 9:09 PM, Christian F. Gonzalez Di Antonio <
christian...@gmail.com> escribió:

> uhh, so sorry, I forgot it.
>
> Dockek Hub: https://hub.docker.com/r/christiangda/kafka/
>
> Github: https://github.com/christiangda/docker-kafka
>
> Regards,
>
> Christian
>
>
>
> El mié., 29 nov. 2017 8:01 PM, Jeremy Hansen  escribió:
>
>> Christian, I didn’t see your github link.
>>
>> Thanks
>>
>> On Nov 29, 2017, at 1:44 PM, Christian F. Gonzalez Di Antonio <
>> christian...@gmail.com> wrote:
>>
>> Hi everyone, I would like to share my recently kafka's docker image with
>> you.
>>
>> I made this docker image because I try to get the major focus of kafka
>> configuration in my development environment and also because any of the
>> others images available was not easy to understand (how to use) outside
>> kafka itself configuration.
>>
>> it was not tested on Kubernates, but I expect to do that soon.
>>
>>
>> feel free to let me know your feedback on the github's repository
>>
>> Regards,
>> Christian
>>
>>


Lost messages and messed up offsets

2017-11-29 Thread Tom van den Berge
I'm using Kafka 0.10.0.

I'm reading messages from a single topic (20 partitions), using 4 consumers
(one group), using a standard java consumer with default configuration,
except for the key and value deserializer, and a group id; no other
settings.

We've been experiencing a serious problem a few times now, after a large
burst of messages (75000) have been posted to the topic. The consumer lag
(as reported by Kafka's kafka-consumer-groups.sh) immediately shows a huge
lag, which is expected. The consumers start processing the messages, which
is expected to take them at least 30 minutes. In the mean time, more
messages are posted to the topic, but at a "normal" rate, which the
consumers normally handle easily. The problem is that the reported consumer
lag is not decreasing at all. After some 30 minutes, it has even increased
slightly. This would mean that the consumers are not able to process the
backlog at all, which is extremely unlikely.

After a restart of all consumer applications, something really surprising
happens: the lag immediately drops to nearly 0! It is technically
impossible that the consumers really processed all messages in a matter of
seconds. Manual verification showed that many messages were not processed
at all; they seem to have disappeared somehow. So it seems that restarting
the consumers somehow messed up the offset (I think).

On top of that, I noticed that the reported lag shows seemingly impossible
figures. During the time that the lag was not decreasing, before the
restart of the consumers, the "current offset" that was reported for some
partitions decreased. To my knowledge, that is impossible.

Does anyone have an idea on how this could have happened?


Re: java.lang.IllegalStateException: Correlation id for response () does not match request ()

2017-11-29 Thread Aarti Gupta
https://issues.apache.org/jira/browse/KAFKA-4669?focusedCommentId=16271727&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16271727
Can we reopen this issue ?

We saw this on the consumer in production today. We are on 0.11.01

ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record
java.lang.Exception: Error fetching next new record from kafka queue
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121)
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94)
at
com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Correlation id for response
(386681) does not match request (386680), request header:
{api_key=9,api_version=3,correlation_id=386680,client_id=consumer-36}

at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
at
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477)
at
org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346)
at
com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86)
... 5 common frames omitted

On Mon, Mar 6, 2017 at 9:44 AM, Ismael Juma  wrote:

> Hi Mickael,
>
> This looks to be the same as KAFKA-4669. In theory, this should never
> happen and it's unclear when/how it can happen. Not sure if someone has
> investigated it in more detail.
>
> Ismael
>
> On Mon, Mar 6, 2017 at 5:15 PM, Mickael Maison 
> wrote:
>
> > Hi,
> >
> > In one of our clusters, some of our clients occasionally see this
> > exception:
> > java.lang.IllegalStateException: Correlation id for response (4564)
> > does not match request (4562)
> > at org.apache.kafka.clients.NetworkClient.correlate(
> > NetworkClient.java:486)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:381)
> > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:229)
> > at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:134)
> > at java.lang.Thread.run(Unknown Source)
> >
> > We've also seen it from consumer poll() and commit()
> >
> > Usually the response's correlation id is off by just 1 or 2 (like
> > above) but we've also seen it off by a few hundreds:
> > java.lang.IllegalStateException: Correlation id for response (742)
> > does not match request (174)
> > at org.apache.kafka.clients.NetworkClient.correlate(
> > NetworkClient.java:486)
> > at org.apache.kafka.clients.NetworkClient.parseResponse(
> > NetworkClient.java:381)
> > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
> > NetworkClient.java:449)
> > at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:269)
> > at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.
> > clientPoll(ConsumerNetworkClient.java:360)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> > at org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > commitOffsetsSync(ConsumerCoordinator.java:426)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > commitSync(KafkaConsumer.java:1059)
> > at org.apache.kafka.clients.consumer.KafkaConsumer.
> > commitSync(KafkaConsumer.java:1027)
> >
> > When this happens, all subsequent responses are also shifted:
> > java.lang.IllegalStateException: Correlation id for response (743)
> > does not match request (742)
> > java.

RE: [EXTERNAL] - Lost messages and messed up offsets

2017-11-29 Thread Isabelle Giguère
Hi;

With default configuration, your consumers are set with auto.offset.reset=latest
So on restart, the consumers start to read the offset of 0 minutes ago, not the 
offset of 30 minutes ago (or whatever the lag was).

https://kafka.apache.org/documentation/#configuration
auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset 
does not exist anymore on the server (e.g. because that data has been deleted):
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for 
the consumer's group
anything else: throw exception to the consumer.

For the "current offset" that seems to decrease, I have no idea.

Isabelle Giguère
Computational Linguist and Java Developer
Linguiste informaticienne et développeur Java

_
Open Text
The Content Experts

-Original Message-
From: Tom van den Berge [mailto:tom.vandenbe...@gmail.com] 
Sent: 29 novembre 2017 17:16
To: users@kafka.apache.org
Subject: [EXTERNAL] - Lost messages and messed up offsets

I'm using Kafka 0.10.0.

I'm reading messages from a single topic (20 partitions), using 4 consumers 
(one group), using a standard java consumer with default configuration, except 
for the key and value deserializer, and a group id; no other settings.

We've been experiencing a serious problem a few times now, after a large burst 
of messages (75000) have been posted to the topic. The consumer lag (as 
reported by Kafka's kafka-consumer-groups.sh) immediately shows a huge lag, 
which is expected. The consumers start processing the messages, which is 
expected to take them at least 30 minutes. In the mean time, more messages are 
posted to the topic, but at a "normal" rate, which the consumers normally 
handle easily. The problem is that the reported consumer lag is not decreasing 
at all. After some 30 minutes, it has even increased slightly. This would mean 
that the consumers are not able to process the backlog at all, which is 
extremely unlikely.

After a restart of all consumer applications, something really surprising
happens: the lag immediately drops to nearly 0! It is technically impossible 
that the consumers really processed all messages in a matter of seconds. Manual 
verification showed that many messages were not processed at all; they seem to 
have disappeared somehow. So it seems that restarting the consumers somehow 
messed up the offset (I think).

On top of that, I noticed that the reported lag shows seemingly impossible 
figures. During the time that the lag was not decreasing, before the restart of 
the consumers, the "current offset" that was reported for some partitions 
decreased. To my knowledge, that is impossible.

Does anyone have an idea on how this could have happened?


Re: Lost messages and messed up offsets

2017-11-29 Thread Frank Lyaruu
Do you commit the received messages? Either by doing it manually or setting
enable.auto.commit and auto.commit.interval.ms?

On Wed, Nov 29, 2017 at 11:15 PM, Tom van den Berge <
tom.vandenbe...@gmail.com> wrote:

> I'm using Kafka 0.10.0.
>
> I'm reading messages from a single topic (20 partitions), using 4 consumers
> (one group), using a standard java consumer with default configuration,
> except for the key and value deserializer, and a group id; no other
> settings.
>
> We've been experiencing a serious problem a few times now, after a large
> burst of messages (75000) have been posted to the topic. The consumer lag
> (as reported by Kafka's kafka-consumer-groups.sh) immediately shows a huge
> lag, which is expected. The consumers start processing the messages, which
> is expected to take them at least 30 minutes. In the mean time, more
> messages are posted to the topic, but at a "normal" rate, which the
> consumers normally handle easily. The problem is that the reported consumer
> lag is not decreasing at all. After some 30 minutes, it has even increased
> slightly. This would mean that the consumers are not able to process the
> backlog at all, which is extremely unlikely.
>
> After a restart of all consumer applications, something really surprising
> happens: the lag immediately drops to nearly 0! It is technically
> impossible that the consumers really processed all messages in a matter of
> seconds. Manual verification showed that many messages were not processed
> at all; they seem to have disappeared somehow. So it seems that restarting
> the consumers somehow messed up the offset (I think).
>
> On top of that, I noticed that the reported lag shows seemingly impossible
> figures. During the time that the lag was not decreasing, before the
> restart of the consumers, the "current offset" that was reported for some
> partitions decreased. To my knowledge, that is impossible.
>
> Does anyone have an idea on how this could have happened?
>


Re: kafka compacted topic

2017-11-29 Thread Kane Kim
I want to confirm if kafka has to re-compact all log segments, as log grows
doesn't it become slower as well?

On Tue, Nov 28, 2017 at 11:33 PM, Jakub Scholz  wrote:

> There is quite a nice section on this in the documentation -
> http://kafka.apache.org/documentation/#compaction ... I think it should
> answer your questions.
>
> On Wed, Nov 29, 2017 at 7:19 AM, Kane Kim  wrote:
>
> > How does kafka log compaction work?
> > Does it compact all of the log files periodically against new changes?
> >
>