Re: Consumer group describe issue

2017-12-27 Thread sahil aggarwal
@TedYu
>From 0.10.0 

@Bill
Thanks for the pointer. Will follow the steps mentioned in the doc.

On 28 December 2017 at 07:39, Ted Yu  wrote:

> Which branch was the patch generated from ?
> When I tried to apply the patch:
>
> 6 out of 7 hunks FAILED -- saving rejects to file
> core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej
>
> FYI
>
> On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
> wrote:
>
> > Attached the patch. If someone can review it will be very helpfull.
> >
> >
> >
> > Thanks,
> > sahil
> >
> > On 23 December 2017 at 13:11, sahil aggarwal 
> > wrote:
> >
> >> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
> >>
> >> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
> >> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
> >> rGroupCommand.scala#L467
> >>
> >> On 23 December 2017 at 13:07, Ted Yu  wrote:
> >>
> >>> Sahil:
> >>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
> >>> didn't find any occurrence.
> >>>
> >>> Mind giving us the location (and class) where getEndOffsets is called ?
> >>>
> >>> Thanks
> >>>
> >>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal <
> sahil.ag...@gmail.com>
> >>> wrote:
> >>>
> >>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible
> to
> >>> > push it upstream for 0.10.* ?
> >>> >
> >>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which
> has
> >>> > requestTimeoutMs instead of Long.MAX_VALUE.
> >>> >
> >>> > On 23 December 2017 at 02:46, Matthias J. Sax  >
> >>> > wrote:
> >>> >
> >>> > > Your observation is correct. KafkaConsumer.position() is a blocking
> >>> > > call. It's a know issue that there is no configurable timeout
> value.
> >>> > >
> >>> > > I am not aware of any workaround.
> >>> > >
> >>> > >
> >>> > > -Matthias
> >>> > >
> >>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
> >>> > > > Hi,
> >>> > > >
> >>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
> >>> stuck if
> >>> > > one
> >>> > > > of the partition is unavailable i.e no leader. Going through some
> >>> code
> >>> > > > found that it does following to get log end offset:
> >>> > > >
> >>> > > > * Create consumer
> >>> > > > * For each partition
> >>> > > >* assign partition
> >>> > > >* seek to end
> >>> > > >* get position
> >>> > > >
> >>> > > > Issue is KafkaConsumer.posiiton() use
> >>> Fetcher.retrieveOffsetsByTimes()
> >>> > > > internally which is called with timeout Long.MAX_VALUE and it
> gets
> >>> > stuck
> >>> > > in
> >>> > > > loop there.
> >>> > > >
> >>> > > >
> >>> > > > Any pointers?
> >>> > > >
> >>> > > >
> >>> > > > *Version*: 0.10.0.1
> >>> > > >
> >>> > > >
> >>> > > > Thanks,
> >>> > > > Sahil
> >>> > > >
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>


1 to N transformers in Kafka Connect

2017-12-27 Thread Ziliang Chen
Hi,

May i ask if it is possible to do 1 kafka record to many Kafka Connect
records transformation ?
I know we have 1:1 transformation supported in Kafka Connect, but it
appears to me there are quite some user cases which requires 1:N
transformation

Thank you very much !

-- 
Regards, Zi-Liang

Mail:zlchen@gmail.com


Re: kafka-client throwing IllegalStateException on calling wait

2017-12-27 Thread Ted Yu
Have you seen
https://examples.javacodegeeks.com/java-basics/exceptions/java-lang-illegalmonitorstateexception-how-to-solve-illegalmonitorstateexception/
?

You didn't include the whole code w.r.t. shadowKafkaProducer
If you need more help, please consider including more of your code.

Cheers

On Wed, Dec 27, 2017 at 5:32 AM, Debraj Manna 
wrote:

> Cross-posting from stackoverflow
>  illegalstateexception-on-calling-wait>
>
> Kafka Client 0.10.0.0 is throwing the below IllegalStateException whenever
> I am calling wait()
>
> ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 -
> POST /collectortosaasservlet] - InterruptedException in producer.wait.
> for cloning
> java.lang.IllegalMonitorStateException: null
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at com.van.grid.uploadHandler.UploadHandler.stopCloning(
> UploadHandler.java:481)
>
> The relevant code looks like below
>
> shadowKafkaProducer = new
> KafkaProducer<>(kafkaShadowProdConf);...public void stopCloning() {
> logger.info("Going to stop cloning");
> if(shadowKafkaProducer != null) {
> try {
> shadowKafkaProducer.wait();
> } catch (Exception e) {
> logger.error("InterruptedException in producer.wait.
> for cloning", e);
> }
> shadowKafkaProducer.close();
> shadowKafkaProducer = null;
> }
> logger.info("Stopped cloning");
> }
>
> shadowKafkaProducer.wait() is line number 481 in the above stacktrace.
>
> Can someone let me know why is this exception thrown & can I ignore this?
>


Re: kafka-client throwing IllegalStateException on calling wait

2017-12-27 Thread Debraj Manna
Anyone any thoughts?

On Wed, Dec 27, 2017 at 7:02 PM, Debraj Manna 
wrote:

> Cross-posting from stackoverflow
> 
>
> Kafka Client 0.10.0.0 is throwing the below IllegalStateException whenever
> I am calling wait()
>
> ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 - POST 
> /collectortosaasservlet] - InterruptedException in producer.wait. for cloning
> java.lang.IllegalMonitorStateException: null
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> com.van.grid.uploadHandler.UploadHandler.stopCloning(UploadHandler.java:481)
>
> The relevant code looks like below
>
> shadowKafkaProducer = new KafkaProducer<>(kafkaShadowProdConf);...public void 
> stopCloning() {
> logger.info("Going to stop cloning");
> if(shadowKafkaProducer != null) {
> try {
> shadowKafkaProducer.wait();
> } catch (Exception e) {
> logger.error("InterruptedException in producer.wait. for 
> cloning", e);
> }
> shadowKafkaProducer.close();
> shadowKafkaProducer = null;
> }
> logger.info("Stopped cloning");
> }
>
> shadowKafkaProducer.wait() is line number 481 in the above stacktrace.
>
> Can someone let me know why is this exception thrown & can I ignore this?
>


Re: Consumer group describe issue

2017-12-27 Thread Ted Yu
Which branch was the patch generated from ?
When I tried to apply the patch:

6 out of 7 hunks FAILED -- saving rejects to file
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala.rej

FYI

On Mon, Dec 25, 2017 at 9:45 PM, sahil aggarwal 
wrote:

> Attached the patch. If someone can review it will be very helpfull.
>
>
>
> Thanks,
> sahil
>
> On 23 December 2017 at 13:11, sahil aggarwal 
> wrote:
>
>> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>>
>> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> rGroupCommand.scala#L467
>>
>> On 23 December 2017 at 13:07, Ted Yu  wrote:
>>
>>> Sahil:
>>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>>> didn't find any occurrence.
>>>
>>> Mind giving us the location (and class) where getEndOffsets is called ?
>>>
>>> Thanks
>>>
>>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
>>> wrote:
>>>
>>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>>> > push it upstream for 0.10.* ?
>>> >
>>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>>> > requestTimeoutMs instead of Long.MAX_VALUE.
>>> >
>>> > On 23 December 2017 at 02:46, Matthias J. Sax 
>>> > wrote:
>>> >
>>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>>> > > call. It's a know issue that there is no configurable timeout value.
>>> > >
>>> > > I am not aware of any workaround.
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>>> > > > Hi,
>>> > > >
>>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>>> stuck if
>>> > > one
>>> > > > of the partition is unavailable i.e no leader. Going through some
>>> code
>>> > > > found that it does following to get log end offset:
>>> > > >
>>> > > > * Create consumer
>>> > > > * For each partition
>>> > > >* assign partition
>>> > > >* seek to end
>>> > > >* get position
>>> > > >
>>> > > > Issue is KafkaConsumer.posiiton() use
>>> Fetcher.retrieveOffsetsByTimes()
>>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>>> > stuck
>>> > > in
>>> > > > loop there.
>>> > > >
>>> > > >
>>> > > > Any pointers?
>>> > > >
>>> > > >
>>> > > > *Version*: 0.10.0.1
>>> > > >
>>> > > >
>>> > > > Thanks,
>>> > > > Sahil
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>
>>
>


Re: Consumer group describe issue

2017-12-27 Thread Bill Bejeck
Sahil,

Thanks for the patch.

In order to have your patch reviewed,  please look over the contribution
guidelines outlined here https://kafka.apache.org/contributing

Thanks,
Bill

On Tue, Dec 26, 2017 at 12:45 AM, sahil aggarwal 
wrote:

> Attached the patch. If someone can review it will be very helpfull.
>
>
>
> Thanks,
> sahil
>
> On 23 December 2017 at 13:11, sahil aggarwal 
> wrote:
>
>> my bad its  consumer.endOffsets in ConsumerGroupCommand.scala.
>>
>> https://github.com/apache/kafka/blob/ef97ed7ee5cb883a30245ea
>> 4f77ef99d4db15373/core/src/main/scala/kafka/admin/Consume
>> rGroupCommand.scala#L467
>>
>> On 23 December 2017 at 13:07, Ted Yu  wrote:
>>
>>> Sahil:
>>> I did a quick search in 0.11.0 branch and trunk for getEndOffsets but
>>> didn't find any occurrence.
>>>
>>> Mind giving us the location (and class) where getEndOffsets is called ?
>>>
>>> Thanks
>>>
>>> On Fri, Dec 22, 2017 at 11:29 PM, sahil aggarwal 
>>> wrote:
>>>
>>> > Fixed it by some code change in ConsumerGroupCommand.scala. Possible to
>>> > push it upstream for 0.10.* ?
>>> >
>>> > It seems to be fixed in 0.11.* where it uses getEndOffsets() which has
>>> > requestTimeoutMs instead of Long.MAX_VALUE.
>>> >
>>> > On 23 December 2017 at 02:46, Matthias J. Sax 
>>> > wrote:
>>> >
>>> > > Your observation is correct. KafkaConsumer.position() is a blocking
>>> > > call. It's a know issue that there is no configurable timeout value.
>>> > >
>>> > > I am not aware of any workaround.
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 12/21/17 6:05 AM, sahil aggarwal wrote:
>>> > > > Hi,
>>> > > >
>>> > > > Facing issue where *kafka-consumer-groups.sh --describe * get
>>> stuck if
>>> > > one
>>> > > > of the partition is unavailable i.e no leader. Going through some
>>> code
>>> > > > found that it does following to get log end offset:
>>> > > >
>>> > > > * Create consumer
>>> > > > * For each partition
>>> > > >* assign partition
>>> > > >* seek to end
>>> > > >* get position
>>> > > >
>>> > > > Issue is KafkaConsumer.posiiton() use
>>> Fetcher.retrieveOffsetsByTimes()
>>> > > > internally which is called with timeout Long.MAX_VALUE and it gets
>>> > stuck
>>> > > in
>>> > > > loop there.
>>> > > >
>>> > > >
>>> > > > Any pointers?
>>> > > >
>>> > > >
>>> > > > *Version*: 0.10.0.1
>>> > > >
>>> > > >
>>> > > > Thanks,
>>> > > > Sahil
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>
>>
>


log.deleted

2017-12-27 Thread Wim Van Leuven
Hello,

When Kafka evicts a segment, it renames the files by appending .deleted. In
configuration you can also specify how long to keep these files laying
around.

What's the actual use of keeping the .delete files around? Can you
'restore' the deleted data from these files after e.g. incrementing
retention periods or sizes?

Thanks for the explanation!
-wim


Re: Kafka Streams - max.poll.interval.ms defaults to Integer.MAX_VALUE

2017-12-27 Thread Matthias J. Sax
It would -- however, this is not an issue for KafkaStreams as we make
sure the thread is either still alive or properly shut down. Thus, if an
error happens and the thread really dies, KafkaStreams ensures that the
heartbeat thread is stopped and thus, a rebalance would not block
forever as it drops out of the group via session-timeout.

And as long as a KafkaStreams instance does restore, the rebalance
should block by design.

Note, that all this does not hold for newer versions of KafkaStreams
anymore.


-Matthias



On 12/27/17 6:55 AM, Javier Holguera wrote:
> Hi Matthias,
> 
> Thanks for your answer. It makes a lot of sense.
> 
> Just a follow-up question. KIP-62 says: "we give the client as much as 
> max.poll.interval.ms to handle a batch of records, this is also the maximum 
> time before a consumer can be expected to rejoin the group in the worst 
> case". Does it mean that a broker would wait Integer.MAX_VALUE for a client 
> to report in the event of a rebalance? That sounds improbable, so I must be 
> missing something.
> 
> Thanks.
> 
> 
> -Original Message-
> From: Matthias J. Sax [mailto:matth...@confluent.io] 
> Sent: Friday, December 22, 2017 9:13 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Streams - max.poll.interval.ms defaults to 
> Integer.MAX_VALUE
> 
> The value was change to make Streams application robust against large state 
> restore phases during rebalance.
> 
> Ie, it is targeted to exactly "fix" 2. If an application needs to restore 
> state, this state restore might take longer than the max.poll.interval.ms 
> parameter and thus, even if the application is in a good state it drops out 
> of the group. This results in rebalance "storms". The consumer default of 30 
> seconds is too small for most applications and thus we set it to MAX_VALUE -- 
> if you have a good estimate on the max expected state restore time, you can 
> safely set the timeout to an appropriate value.
> 
> Note, in Kafka 0.11 and 1.0 Kafka Streams state restore was largely improved 
> and it should not be an issue there to reduce the timeout accordingly.
> 
> 
> -Matthias
> 
> On 12/20/17 7:14 AM, Javier Holguera wrote:
>> Hi,
>>
>> According to the documentation, "max.poll.interval.ms" defaults to 
>> Integer.MAX_VALUE for Kafka Streams since 0.10.2.1.
>>
>> Considering that the "max.poll.interval.ms" is:
>>
>>   1.  A "processing timeout" to control an upper limit for processing a 
>> batch of records AND
>>   2.  The rebalance timeout that the client will communicate to the 
>> broker, according to KIP-62
>>
>> How do Kafka Streams application detect slow consumers that are taking too 
>> long to process a batch of messages? What replaces the existing mechanism 
>> with a smaller "max.poll.interval.ms" where the application will willingly 
>> abandon the consumer group when the timeout expires?
>>
>> From the broker perspective, what does it mean that the application 
>> communicates a "rebalance timeout" of Integer.MAX_VALUE? I can imagine it 
>> will not wait for that long in a rebalance. What happens then?
>>
>> Thanks.
>>
> 



signature.asc
Description: OpenPGP digital signature


Fw: Number of Processes Kafka Server spawns

2017-12-27 Thread Sven Ludwig
Answering myself:

Kafka Server spawns only 1 process.

htop by default also shows threads, which was misleading. It can be turned off 
by pressing H

https://unix.stackexchange.com/questions/10362/why-does-htop-show-more-process-than-ps

Regards,
Sven
 

Gesendet: Mittwoch, 27. Dezember 2017 um 16:17 Uhr
Von: "Sven Ludwig" 
An: users@kafka.apache.org
Betreff: Number of Processes Kafka Server spawns
Hi,

does Kafka ever launch processes in addition to the Kafka server process?

I have a setup with CentOS plus Docker plus Kafka Server container based on the 
image of Confluent Platform 4.0.0

In ps -Alf there is only one Kafka process, but in htop I can see many.

Regards,
Sven


Number of Processes Kafka Server spawns

2017-12-27 Thread Sven Ludwig
Hi,

does Kafka ever launch processes in addition to the Kafka server process?

I have a setup with CentOS plus Docker plus Kafka Server container based on the 
image of Confluent Platform 4.0.0

In ps -Alf there is only one Kafka process, but in htop I can see many.

Regards,
Sven


RE: Kafka Streams - max.poll.interval.ms defaults to Integer.MAX_VALUE

2017-12-27 Thread Javier Holguera
Hi Matthias,

Thanks for your answer. It makes a lot of sense.

Just a follow-up question. KIP-62 says: "we give the client as much as 
max.poll.interval.ms to handle a batch of records, this is also the maximum 
time before a consumer can be expected to rejoin the group in the worst case". 
Does it mean that a broker would wait Integer.MAX_VALUE for a client to report 
in the event of a rebalance? That sounds improbable, so I must be missing 
something.

Thanks.


-Original Message-
From: Matthias J. Sax [mailto:matth...@confluent.io] 
Sent: Friday, December 22, 2017 9:13 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams - max.poll.interval.ms defaults to Integer.MAX_VALUE

The value was change to make Streams application robust against large state 
restore phases during rebalance.

Ie, it is targeted to exactly "fix" 2. If an application needs to restore 
state, this state restore might take longer than the max.poll.interval.ms 
parameter and thus, even if the application is in a good state it drops out of 
the group. This results in rebalance "storms". The consumer default of 30 
seconds is too small for most applications and thus we set it to MAX_VALUE -- 
if you have a good estimate on the max expected state restore time, you can 
safely set the timeout to an appropriate value.

Note, in Kafka 0.11 and 1.0 Kafka Streams state restore was largely improved 
and it should not be an issue there to reduce the timeout accordingly.


-Matthias

On 12/20/17 7:14 AM, Javier Holguera wrote:
> Hi,
> 
> According to the documentation, "max.poll.interval.ms" defaults to 
> Integer.MAX_VALUE for Kafka Streams since 0.10.2.1.
> 
> Considering that the "max.poll.interval.ms" is:
> 
>   1.  A "processing timeout" to control an upper limit for processing a batch 
> of records AND
>   2.  The rebalance timeout that the client will communicate to the 
> broker, according to KIP-62
> 
> How do Kafka Streams application detect slow consumers that are taking too 
> long to process a batch of messages? What replaces the existing mechanism 
> with a smaller "max.poll.interval.ms" where the application will willingly 
> abandon the consumer group when the timeout expires?
> 
> From the broker perspective, what does it mean that the application 
> communicates a "rebalance timeout" of Integer.MAX_VALUE? I can imagine it 
> will not wait for that long in a rebalance. What happens then?
> 
> Thanks.
> 



kafka-client throwing IllegalStateException on calling wait

2017-12-27 Thread Debraj Manna
Cross-posting from stackoverflow


Kafka Client 0.10.0.0 is throwing the below IllegalStateException whenever
I am calling wait()

ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 -
POST /collectortosaasservlet] - InterruptedException in producer.wait.
for cloning
java.lang.IllegalMonitorStateException: null
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at 
com.van.grid.uploadHandler.UploadHandler.stopCloning(UploadHandler.java:481)

The relevant code looks like below

shadowKafkaProducer = new
KafkaProducer<>(kafkaShadowProdConf);...public void stopCloning() {
logger.info("Going to stop cloning");
if(shadowKafkaProducer != null) {
try {
shadowKafkaProducer.wait();
} catch (Exception e) {
logger.error("InterruptedException in producer.wait.
for cloning", e);
}
shadowKafkaProducer.close();
shadowKafkaProducer = null;
}
logger.info("Stopped cloning");
}

shadowKafkaProducer.wait() is line number 481 in the above stacktrace.

Can someone let me know why is this exception thrown & can I ignore this?