[jira] [Commented] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-11 Thread Jun He (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15659173#comment-15659173
 ] 

Jun He commented on KAFKA-4384:
---

Sure, here is the PR (https://github.com/apache/kafka/pull/2127). Thanks.

> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15659171#comment-15659171
 ] 

ASF GitHub Bot commented on KAFKA-4384:
---

GitHub user jun-he opened a pull request:

https://github.com/apache/kafka/pull/2127

KAFKA-4384: ReplicaFetcherThread stopped after ReplicaFetcherThread 
received a corrupted message

@becketqin 

Here is the patch for KAFKA-4384 (ReplicaFetcherThread stopped after 
ReplicaFetcherThread received a corrupted message).



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jun-he/kafka KAFKA-4384

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2127


commit 5eabb29d9f8036a2bf3c2955d129e69d93633395
Author: Jun He 
Date:   2016-11-12T05:30:37Z

Patch for KAFKA-4384 (ReplicaFetcherThread stopped after 
ReplicaFetcherThread received a corrupted message)




> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
> Attachments: KAFKA-4384.patch
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2127: KAFKA-4384: ReplicaFetcherThread stopped after Rep...

2016-11-11 Thread jun-he
GitHub user jun-he opened a pull request:

https://github.com/apache/kafka/pull/2127

KAFKA-4384: ReplicaFetcherThread stopped after ReplicaFetcherThread 
received a corrupted message

@becketqin 

Here is the patch for KAFKA-4384 (ReplicaFetcherThread stopped after 
ReplicaFetcherThread received a corrupted message).



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jun-he/kafka KAFKA-4384

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2127


commit 5eabb29d9f8036a2bf3c2955d129e69d93633395
Author: Jun He 
Date:   2016-11-12T05:30:37Z

Patch for KAFKA-4384 (ReplicaFetcherThread stopped after 
ReplicaFetcherThread received a corrupted message)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-89: Allow sink connectors to decouple flush and offset commit

2016-11-11 Thread Ewen Cheslack-Postava
+1 (binding)

-Ewen

On Wed, Nov 9, 2016 at 2:16 PM, Shikhar Bhushan 
wrote:

> Hi,
>
> I would like to initiate a vote on KIP-89
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit
>
> Best,
>
> Shikhar
>



-- 
Thanks,
Ewen


[jira] [Commented] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing

2016-11-11 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658960#comment-15658960
 ] 

Jiangjie Qin commented on KAFKA-4398:
-

I see, you are saying that the broker should deliver the message based on the 
order of timestamp, right? This is essentially requiring Kafka to behave like a 
distributed priority queue, which is not what it was designed for to begin 
with. And this may not even be feasible because that means the broker has to 
either scan the entire log for each read or it has to index each message based 
on the timestamp order and jump between different offsets all the time. The 
throughput may be almost close to 0. And any future insertion of an earlier 
timestamp would change the log order. Although I agree that it would be good to 
have such a product as a distributed priority queue, but I do not see how Kafka 
could support that. 

> offsetsForTimes returns false starting offset when timestamp of messages are 
> not monotonically increasing
> -
>
> Key: KAFKA-4398
> URL: https://issues.apache.org/jira/browse/KAFKA-4398
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: huxi
>Assignee: huxi
>
> After a code walk-through for KIP-33(Add a time based log index), I found a 
> use case where method 'offsetsForTimes' fails to return the correct offset if 
> a series of messages are created without the monotonically increasing 
> timestamps (CreateTime is used)
> Say T0 is the hour when the first message is created. Tn means the (T+n)th 
> hour. Then, I created another two messages at T1 and T3 respectively. At this 
> moment, the .timeindex should contain two items:
> T1 --->  1
> T3 > 2  (whether it contains T0 does not matter to this problem)
> Later, due to some reason, I want to insert a third message in between T1 and 
> T3, say T2.5, but the time index file got no changed because of the limit 
> that timestamp should be monotonically increasing for each segment.
> After generating message with T2.5, I invoke 
> KafkaConsumer.offsetsForTimes("tp" -> T2.5), hoping to get the first offset 
> with timestamp greater or equal to T2.5 which should be the third message in 
> this case, but consumer returns the second message with T3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Subscribe to Kafka

2016-11-11 Thread Guozhang Wang
Husayn,

It is a self-service:

https://kafka.apache.org/contact

cheers,

Guozhang

On Fri, Nov 11, 2016 at 10:37 AM, Husayn Campbell 
wrote:

> Newbie developer to kafka.
>



-- 
-- Guozhang


Re: [Subscribe]

2016-11-11 Thread Guozhang Wang
Hi Ashish,

It's self-service: https://kafka.apache.org/contact


Guozhang


On Fri, Nov 11, 2016 at 4:20 PM, Ashish Singh  wrote:

> Please subscribe me to this group
>



-- 
-- Guozhang


[jira] [Comment Edited] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658812#comment-15658812
 ] 

Jeff Widman edited comment on KAFKA-2076 at 11/12/16 2:22 AM:
--

Thanks [~becket_qin], BTW I thought your talk at the Kafka meetup at LinkedIn a 
few weeks ago was very well presented.


was (Author: jeffwidman):
Thanks [~becket_qin], BTW you gave a great presentation at the Kafka meetup at 
LinkedIn a few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658812#comment-15658812
 ] 

Jeff Widman edited comment on KAFKA-2076 at 11/12/16 2:21 AM:
--

Thanks [~becket_qin], BTW you gave a great presentation at the Kafka meetup at 
LinkedIn a few weeks ago.


was (Author: jeffwidman):
Thanks [~becket_qin], BTW great presentation at the Kafka meetup at LinkedIn a 
few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658812#comment-15658812
 ] 

Jeff Widman commented on KAFKA-2076:


Thanks [~becket_qin], BTW great presentation at the Kafka meetup at LinkedIn a 
few weeks ago.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-11 Thread Mayuresh Gharat
I think "In second stage we move on to supporting only the attribute flag
for log
compaction." means that it will no longer support request from older
clients.

Thanks,

Mayuresh

On Fri, Nov 11, 2016 at 10:02 AM, Becket Qin  wrote:

> Hey Michael,
>
> The way Kafka implements backwards compatibility is to let the brokers
> support old protocols. So the brokers have to support older clients that do
> not understand the new attribute bit. That means we will not be able to get
> away with the null value as a tombstone. So we need a good definition of
> whether we should treat it as a tombstone or not. This is why I think we
> need a magic value bump.
>
> My confusion on the second stage is exactly about "we want to end up just
> supporting tombstone marker (not both)", or in the original statement: "2)
> In second stage we move on to supporting only the attribute flag for log
> compaction." - We will always support the null value as a tombstone as long
> as the message format version (i.e. magic byte) is less than 2 for the
> reason I mentioned above.
>
> Although the way to interpret the message bytes at producing time can be
> inferred from the the ProduceRequest version, this version information
> won't be stored in the broker. So when an old consumer comes, we need to
> decide whether we have to do down conversion or not.. With a clear existing
> configuration of message version config (i.e. magic value), we know for
> sure when to down convert the messages to adapt to older clients. Otherwise
> we will have to always scan all the messages. It would probably work but
> relies on guess or inference.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 11, 2016 at 8:42 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Sounds good Michael.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce 
> > wrote:
> >
> > > @Mayuresh i don't think you've missed anything -
> > >
> > > as per earlier in the discussion.
> > >
> > > We're providing new api versions, but not planning on bumping the magic
> > > number as there is no structural changes, we are simply using up a new
> > > attribute bit (as like adding new compression support just uses up
> > > additional attribute bits)
> > >
> > >
> > > I think also difference between null vs 0 length byte array is covered.
> > >
> > > @Becket,
> > >
> > > The two stage approach is because we want to end up just supporting
> > > tombstone marker (not both)
> > >
> > > But we accept we need to allow organisations and systems a period of
> > > transition (this is what stage 1 provides)
> > >
> > >
> > >
> > >
> > > 
> > > From: Mayuresh Gharat 
> > > Sent: Thursday, November 10, 2016 8:57 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > I am not sure if we are bumping magicByte value as the KIP does not
> > mention
> > > it (If I didn't miss anything).
> > >
> > > @Nacho : I did not understand what you meant by : Conversion from one
> to
> > > the other may be possible but I would rather not have to do it.
> > >
> > > You will have to do the conversion for the scenario that Becket has
> > > mentioned above where an old consumer talks to the new broker, right?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Thu, Nov 10, 2016 at 11:54 AM, Becket Qin 
> > wrote:
> > >
> > > > Nacho,
> > > >
> > > > In Kafka protocol, a negative length is null, a zero length means
> empty
> > > > byte array.
> > > >
> > > > I am still confused about the two stages. It seems that the broker
> only
> > > > needs to understand three versions of messages.
> > > > 1. MagicValue=0 - no timestamp, absolute offset, no tombstone flag
> > > > 2. MagicValue=1 - with timestamp, relative offsets, no tombstone flag
> > > (null
> > > > value = tombstone)
> > > > 3. MagicValue=2 - with timestamp, relative offsets, tombstone flag
> > > >
> > > > We are talking about two flavors for 3:
> > > > 3.1 tombstone flag set = tombstone (Allows a key with null value in
> the
> > > > compacted topics)
> > > > 3.2 tombstone flag set OR null value = tombstone ( Do not allow a key
> > > with
> > > > null value in the compacted topics)
> > > >
> > > > No matter which flavor we choose, we just need to stick to that way
> of
> > > > interpretation, right? Why would we need a second stage?
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Thu, Nov 10, 2016 at 10:37 AM, Ignacio Solis 
> > wrote:
> > > >
> > > > > A quick differentiation I would like to make is null is not the
> same
> > as
> > > > > size 0.
> > > > >
> > > > > Many times these are made equal, but they're not.  When serializing
> > > data,
> > > > > we have to make a choice in null values and many times these are
> > > > translated
> > > > > to zero length blobs. This is not really the same thing.
> > > > >
> > > > > From this perspective, what can Kafka represent and what is
> > consider

[Subscribe]

2016-11-11 Thread Ashish Singh
Please subscribe me to this group


[jira] [Commented] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing

2016-11-11 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658690#comment-15658690
 ] 

huxi commented on KAFKA-4398:
-

[~becket_qin] Based on the current design, you are definitely right and I 
totally agree with what you said. But as an end user who does not care about 
offsets in this case, he/she might want to consume messages strictly in the 
order of message creation time although Kafka does not support it right now. So 
I am just curious about the possibility of a design change. Of course, I am 
positively okay to close this issue if you think the usage is not a valid 
scenario.

> offsetsForTimes returns false starting offset when timestamp of messages are 
> not monotonically increasing
> -
>
> Key: KAFKA-4398
> URL: https://issues.apache.org/jira/browse/KAFKA-4398
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: huxi
>Assignee: huxi
>
> After a code walk-through for KIP-33(Add a time based log index), I found a 
> use case where method 'offsetsForTimes' fails to return the correct offset if 
> a series of messages are created without the monotonically increasing 
> timestamps (CreateTime is used)
> Say T0 is the hour when the first message is created. Tn means the (T+n)th 
> hour. Then, I created another two messages at T1 and T3 respectively. At this 
> moment, the .timeindex should contain two items:
> T1 --->  1
> T3 > 2  (whether it contains T0 does not matter to this problem)
> Later, due to some reason, I want to insert a third message in between T1 and 
> T3, say T2.5, but the time index file got no changed because of the limit 
> that timestamp should be monotonically increasing for each segment.
> After generating message with T2.5, I invoke 
> KafkaConsumer.offsetsForTimes("tp" -> T2.5), hoping to get the first offset 
> with timestamp greater or equal to T2.5 which should be the third message in 
> this case, but consumer returns the second message with T3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk8 #1038

2016-11-11 Thread Apache Jenkins Server
See 



Subscribe to Kafka

2016-11-11 Thread Husayn Campbell
Newbie developer to kafka.


[jira] [Created] (KAFKA-4400) Prefix for sink task consumer groups should be configurable

2016-11-11 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-4400:


 Summary: Prefix for sink task consumer groups should be 
configurable
 Key: KAFKA-4400
 URL: https://issues.apache.org/jira/browse/KAFKA-4400
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.1.0
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava


Currently the prefix for creating consumer groups is fixed. This means that if 
you run multiple Connect clusters using the same Kafka cluster and create 
connectors with the same name, sink tasks in different clusters will join the 
same group. Making this prefix configurable at the worker level would protect 
against this.

An alternative would be to define unique cluster IDs for each connect cluster, 
which would allow us to construct a unique name for the group without requiring 
yet another config (but presents something of a compatibility challenge).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2126: MINOR: Fix export command for additional env vars ...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2126


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4346) Add foreachValue method to KStream

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4346:
-
Assignee: Xavier Léauté

> Add foreachValue method to KStream
> --
>
> Key: KAFKA-4346
> URL: https://issues.apache.org/jira/browse/KAFKA-4346
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Minor
>  Labels: newbie
>
> This would be the value-only counterpart to foreach, similar to mapValues.
> Adding this method would enhance readability and allow for Java 8 syntactic 
> sugar using method references without having to wrap existing methods that 
> only operate on the value type.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4120.
--
   Resolution: Fixed
Fix Version/s: 0.10.1.0

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
> Fix For: 0.10.1.0
>
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-11-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658318#comment-15658318
 ] 

Guozhang Wang commented on KAFKA-4120:
--

Marking it as resolved after 0.10.1.0. [~elevy] [~gfodor] please feel free to 
re-open if your use case still have similar issues.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2126: MINOR: Fix export command for additional env vars ...

2016-11-11 Thread kkonstantine
GitHub user kkonstantine opened a pull request:

https://github.com/apache/kafka/pull/2126

MINOR: Fix export command for additional env vars in connect system tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kkonstantine/kafka 
MINOR-Fix-formatting-of-env-vars-in-connect-system-test-template

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2126


commit e5f230aef65bd44b52a28cc661e0b51b8205c1c3
Author: Konstantine Karantasis 
Date:   2016-11-11T21:39:35Z

MINOR: Fix export command for additional env vars in connect system tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4212:
-
Labels: api  (was: )

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4212:
-
Assignee: (was: Guozhang Wang)

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4317) RocksDB checkpoint files lost on kill -9

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4317:
-
Priority: Critical  (was: Major)

> RocksDB checkpoint files lost on kill -9
> 
>
> Key: KAFKA-4317
> URL: https://issues.apache.org/jira/browse/KAFKA-4317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Priority: Critical
>  Labels: user-experience
>
> Right now, the checkpoint files for logged RocksDB stores are written during 
> a graceful shutdown, and removed upon restoration. Unfortunately this means 
> that in a scenario where the process is forcibly killed, the checkpoint files 
> are not there, so all RocksDB stores are rematerialized from scratch on the 
> next launch.
> In a way, this is good, because it simulates bootstrapping a new node (for 
> example, its a good way to see how much I/O is used to rematerialize the 
> stores) however it leads to longer recovery times when a non-graceful 
> shutdown occurs and we want to get the job up and running again.
> It seems that two possible things to consider:
> - Simply do not remove checkpoint files on restoring. This way a kill -9 will 
> result in only repeating the restoration of all the data generated in the 
> source topics since the last graceful shutdown.
> - Continually update the checkpoint files (perhaps on commit) -- this would 
> result in the least amount of overhead/latency in restarting, but the 
> additional complexity may not be worth it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4240) Remove disableLogging from API

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4240:
-
Assignee: (was: Guozhang Wang)

> Remove disableLogging from API
> --
>
> Key: KAFKA-4240
> URL: https://issues.apache.org/jira/browse/KAFKA-4240
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
> Fix For: 0.10.2.0
>
>
> The disableLogging API in PersistentKeyValueFactory is potentially not needed 
> since all stores should have a backing changelog for recovery. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3522) Consider adding version information into rocksDB storage format

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3522:
-
Priority: Critical  (was: Major)

> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Ishita Mandhan
>Priority: Critical
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread radai
ok, i've made the following changes:

1. memory.pool.class.name has been removed
2. the code now only uses SimpleMemoryPool. the gc variant is left (unused)
as a developement aid and is unsettable via configuration.
3. I've resolved the issue of stale data getting stuck in intermediate
(ssl) buffers.
4. default value for queued.max.bytes is -1, so off by default. any <=0
value is interpreted as off by the underlying code.

open points:

1. the kafka config framework doesnt allow a value to be either long or
double, so in order to pull off the queued.max.bytes = 100 or
queued.max.bytes = 0.3 thing i'd need to define the config as type string,
which is ugly to me. do we want to support setting queued.max.bytes to % of
heap ? if so, by way of making queued.max.bytes of type string, or by way
of a 2nd config param (with the resulting either/all/combination?
validation). my personal opinion is string because i think a single
queued.max.bytes with overloaded meaning is more understandable to users.
i'll await other people's opinions before doing anything.
2. i still need to evaluate rajini's optimization. sounds doable.

asides:

1. i think you guys misunderstood the intent behind the gc pool. it was
never meant to be a magic pool that automatically releases buffers (because
just as rajini stated the performance implications would be horrible). it
was meant to catch leaks early. since that is indeed a dev-only concern it
wont ever get used in production.
2. i said this on some other kip discussion: i think the nice thing about
the pool API is it "scales" from just keeping a memory bound to actually
re-using buffers without changing the calling code. i think actuallypooling
large buffers will result in a significant performance impact, but thats
outside the scope of this kip. at that point i think more pool
implementations (that actually pool) would be written. i agree with the
ideal of exposing as few knobs as possible, but switching pools (or pool
params) for tuning may happen at some later point.



On Fri, Nov 11, 2016 at 11:44 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> 13. At the moment, I think channels are not muted if:
> channel.receive != null && channel.receive.buffer != null
> This mutes all channels that aren't holding onto a incomplete buffer. They
> may or may not have read the 4-byte size.
>
> I was thinking you could avoid muting channels if:
> channel.receive == null || channel.receive.size.remaining()
> This will not mute channels that are holding onto a buffer (as above). In
> addition, it will not mute channels that haven't read the 4-byte size. A
> client that is closed gracefully while the pool is full will not be muted
> in this case and the server can process close without waiting for the pool
> to free up. Once the 4-byte size is read, the channel will be muted if the
> pool is still out of memory - for each channel, at most one failed read
> attempt would be made while the pool is out of memory. I think this would
> also delay muting of SSL channels since they can continue to read into
> their (already allocated) network buffers and unwrap the data and block
> only when they need to allocate a buffer from the pool.
>
> On Fri, Nov 11, 2016 at 6:00 PM, Jay Kreps  wrote:
>
> > Hey Radai,
> >
> > +1 on deprecating and eventually removing the old config. The intention
> was
> > absolutely bounding memory usage. I think having two ways of doing this,
> > one that gives a crisp bound on memory and one that is hard to reason
> about
> > is pretty confusing. I think people will really appreciate having one
> > config which instead lets them directly control the thing they actually
> > care about (memory).
> >
> > I also want to second Jun's concern on the complexity of the self-GCing
> > memory pool. I wrote the memory pool for the producer. In that area the
> > pooling of messages is the single biggest factor in performance of the
> > client so I believed it was worth some sophistication/complexity if there
> > was performance payoff. All the same, the complexity of that code has
> made
> > it VERY hard to keep correct (it gets broken roughly every other time
> > someone makes a change). Over time I came to feel a lot less proud of my
> > cleverness. I learned something interesting reading your self-GCing
> memory
> > pool, but I wonder if the complexity is worth the payoff in this case?
> >
> > Philosophically we've tried really hard to avoid needlessly "pluggable"
> > implementations. That is, when there is a temptation to give a config
> that
> > plugs in different Java classes at run time for implementation choices,
> we
> > should instead think of how to give the user the good behavior
> > automatically. I think the use case for configuring a the GCing pool
> would
> > be if you discovered a bug in which memory leaked. But this isn't
> something
> > the user should have to think about right? If there is a bug we should
> find
> > and fix it.
> >
> > -Jay
> >
>

[jira] [Updated] (KAFKA-4270) ClassCast for Agregation

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4270:
-
Labels: architecture  (was: )

> ClassCast for Agregation
> 
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mykola Polonskyi
>  Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the 
> ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide 
> `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) 
> Instead defined serde inside aggregation call is used default Impl with empty 
> plugs instead of implementations 
> {code:koltin} 
> userTable.join(
> skicardsTable.groupBy { key, value -> 
> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf() }, 
> { ownerId, skicardInfo, accumulator -> 
> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator -> 
> accumulator }, 
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet -> 
> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting 
> up should be changed because that is incorrect in release 0.10.0.1-cp1 to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4270) ClassCast for Agregation

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4270:
-
Priority: Critical  (was: Major)

> ClassCast for Agregation
> 
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mykola Polonskyi
>Priority: Critical
>  Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the 
> ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide 
> `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) 
> Instead defined serde inside aggregation call is used default Impl with empty 
> plugs instead of implementations 
> {code:koltin} 
> userTable.join(
> skicardsTable.groupBy { key, value -> 
> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf() }, 
> { ownerId, skicardInfo, accumulator -> 
> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator -> 
> accumulator }, 
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet -> 
> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting 
> up should be changed because that is incorrect in release 0.10.0.1-cp1 to.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-11-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658156#comment-15658156
 ] 

Guozhang Wang commented on KAFKA-4281:
--

As a hinder-thought of KIP-63, I think it is generally a better idea to have 
finer granularity as for result forwarding with caching turned on. [~damianguy] 
Do you want to chime in here and help Greg contributing towards this direction 
if you agree?

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4219) Permit setting of event time in stream processor

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4219:
-
Labels: api  (was: api user-experience)

> Permit setting of event time in stream processor
> 
>
> Key: KAFKA-4219
> URL: https://issues.apache.org/jira/browse/KAFKA-4219
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>  Labels: api
>
> Event time is assigned in stream sources via {{TimestampExtractor}}.  Once 
> the event time has been assigned, it remains the same, regardless of any 
> downstream processing in the topology.  This is insufficient for many 
> processing jobs, particularly when the output of the job is written back into 
> a Kafka topic, where the record's time is encoded outside of the record's 
> value.
> For instance:
> * When performing windowed aggregations it may be desirable for the timestamp 
> of the emitted record to be lower or higher limits of the time window, rather 
> than the timestamp of the last processed element, which may be anywhere 
> within the time window.
> * When joining two streams, it is non-deterministic which of the two record's 
> timestamps will be the timestamp of the emitted record.  It would be either 
> one depending on what order the records are processed.  Even where this 
> deterministic, it may be desirable for the emitted timestamp to be altogether 
> different from the timestamp of the joined records.  For instance, setting 
> the timestamp to the current processing time may be desirable.
> * In general, lower level processors may wish to set the timestamp of emitted 
> records to an arbitrary value.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4281:
-
Assignee: Greg Fodor

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4281:
-
Status: Patch Available  (was: Open)

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4219) Permit setting of event time in stream processor

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4219:
-
Labels: api user-experience  (was: )

> Permit setting of event time in stream processor
> 
>
> Key: KAFKA-4219
> URL: https://issues.apache.org/jira/browse/KAFKA-4219
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>  Labels: api
>
> Event time is assigned in stream sources via {{TimestampExtractor}}.  Once 
> the event time has been assigned, it remains the same, regardless of any 
> downstream processing in the topology.  This is insufficient for many 
> processing jobs, particularly when the output of the job is written back into 
> a Kafka topic, where the record's time is encoded outside of the record's 
> value.
> For instance:
> * When performing windowed aggregations it may be desirable for the timestamp 
> of the emitted record to be lower or higher limits of the time window, rather 
> than the timestamp of the last processed element, which may be anywhere 
> within the time window.
> * When joining two streams, it is non-deterministic which of the two record's 
> timestamps will be the timestamp of the emitted record.  It would be either 
> one depending on what order the records are processed.  Even where this 
> deterministic, it may be desirable for the emitted timestamp to be altogether 
> different from the timestamp of the joined records.  For instance, setting 
> the timestamp to the current processing time may be desirable.
> * In general, lower level processors may wish to set the timestamp of emitted 
> records to an arbitrary value.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #1037

2016-11-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4379: Remove caching of dirty and removed keys from

[jason] KAFKA-4081; KafkaConsumer should not allow negative offsets to be

--
[...truncated 14328 lines...]

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportMultipleBootstrapServers PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfKeySerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportNonPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetRestoreConsumerConfigs 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedRestoreConsumerConfigs PASSED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose STARTED

org.apache.kafka.streams.KafkaStreamsTest > testStartAndClose PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[0] PASSED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] STARTED

org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest > 
shouldCountClicksPerRegion[1] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldBeAbleToQueryState[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
shouldNotMakeStoreAvailableUntilAllStoresAvailable[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
queryOnRebalance[0] PASSED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 
concurrentAccesses[0] STARTED

org.apache.kafka.streams.integration.QueryableStateIntegrationTest > 

Jenkins build is back to normal : kafka-trunk-jdk7 #1686

2016-11-11 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658044#comment-15658044
 ] 

Alexey Ozeritskiy commented on KAFKA-4399:
--

As for now I think you can use the following config as a workaround
{code}
# 100 days
offsets.retention.minutes=144000
# 1 hour
offsets.retention.check.interval.ms=360
{code}

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4399:
-
Status: Patch Available  (was: Open)

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658027#comment-15658027
 ] 

Alexey Ozeritskiy commented on KAFKA-4399:
--

The simplest way to fix it is to move complicated code (i.e. 
.appendMessagesToLeader) outside the lock. See PR

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658021#comment-15658021
 ] 

ASF GitHub Bot commented on KAFKA-4399:
---

GitHub user resetius opened a pull request:

https://github.com/apache/kafka/pull/2125

KAFKA-4399; deadlock cleanupGroupMetadata and offset commit fixed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/resetius/kafka KAFKA-4399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2125


commit e7e6c4fcff0cb72f6dac46a3c6416ee128e6028c
Author: Alexey Ozeritsky 
Date:   2016-11-11T19:58:31Z

KAFKA-4399; deadlock cleanupGroupMetadata and offset commit fixed




> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2125: KAFKA-4399; deadlock cleanupGroupMetadata and offs...

2016-11-11 Thread resetius
GitHub user resetius opened a pull request:

https://github.com/apache/kafka/pull/2125

KAFKA-4399; deadlock cleanupGroupMetadata and offset commit fixed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/resetius/kafka KAFKA-4399

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2125.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2125


commit e7e6c4fcff0cb72f6dac46a3c6416ee128e6028c
Author: Alexey Ozeritsky 
Date:   2016-11-11T19:58:31Z

KAFKA-4399; deadlock cleanupGroupMetadata and offset commit fixed




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-4399:
-
Attachment: deadlock-stack

> Deadlock between cleanupGroupMetadata and offset commit
> ---
>
> Key: KAFKA-4399
> URL: https://issues.apache.org/jira/browse/KAFKA-4399
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Alexey Ozeritskiy
>Priority: Blocker
> Attachments: deadlock-stack
>
>
> We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
> We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
> patch did not help us and our stacks is different. I think it is other issue.
> Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4399) Deadlock between cleanupGroupMetadata and offset commit

2016-11-11 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-4399:


 Summary: Deadlock between cleanupGroupMetadata and offset commit
 Key: KAFKA-4399
 URL: https://issues.apache.org/jira/browse/KAFKA-4399
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Alexey Ozeritskiy
Priority: Blocker


We have upgraded our clusters to 0.10.1.0 and got deadlock issue.
We thought it smth like https://issues.apache.org/jira/browse/KAFKA-3994, but 
patch did not help us and our stacks is different. I think it is other issue.

Stack traces attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread Rajini Sivaram
13. At the moment, I think channels are not muted if:
channel.receive != null && channel.receive.buffer != null
This mutes all channels that aren't holding onto a incomplete buffer. They
may or may not have read the 4-byte size.

I was thinking you could avoid muting channels if:
channel.receive == null || channel.receive.size.remaining()
This will not mute channels that are holding onto a buffer (as above). In
addition, it will not mute channels that haven't read the 4-byte size. A
client that is closed gracefully while the pool is full will not be muted
in this case and the server can process close without waiting for the pool
to free up. Once the 4-byte size is read, the channel will be muted if the
pool is still out of memory - for each channel, at most one failed read
attempt would be made while the pool is out of memory. I think this would
also delay muting of SSL channels since they can continue to read into
their (already allocated) network buffers and unwrap the data and block
only when they need to allocate a buffer from the pool.

On Fri, Nov 11, 2016 at 6:00 PM, Jay Kreps  wrote:

> Hey Radai,
>
> +1 on deprecating and eventually removing the old config. The intention was
> absolutely bounding memory usage. I think having two ways of doing this,
> one that gives a crisp bound on memory and one that is hard to reason about
> is pretty confusing. I think people will really appreciate having one
> config which instead lets them directly control the thing they actually
> care about (memory).
>
> I also want to second Jun's concern on the complexity of the self-GCing
> memory pool. I wrote the memory pool for the producer. In that area the
> pooling of messages is the single biggest factor in performance of the
> client so I believed it was worth some sophistication/complexity if there
> was performance payoff. All the same, the complexity of that code has made
> it VERY hard to keep correct (it gets broken roughly every other time
> someone makes a change). Over time I came to feel a lot less proud of my
> cleverness. I learned something interesting reading your self-GCing memory
> pool, but I wonder if the complexity is worth the payoff in this case?
>
> Philosophically we've tried really hard to avoid needlessly "pluggable"
> implementations. That is, when there is a temptation to give a config that
> plugs in different Java classes at run time for implementation choices, we
> should instead think of how to give the user the good behavior
> automatically. I think the use case for configuring a the GCing pool would
> be if you discovered a bug in which memory leaked. But this isn't something
> the user should have to think about right? If there is a bug we should find
> and fix it.
>
> -Jay
>
> On Fri, Nov 11, 2016 at 9:21 AM, radai  wrote:
>
> > jun's #1 + rajini's #11 - the new config param is to enable changing the
> > pool implentation class. as i said in my response to jun i will make the
> > default pool impl be the simple one, and this param is to allow a user
> > (more likely a dev) to change it.
> > both the simple pool and the "gc pool" make basically just an
> > AtomicLong.get() + (hashmap.put for gc) calls before returning a buffer.
> > there is absolutely no dependency on GC times in allocating (or not). the
> > extra background thread in the gc pool is forever asleep unless there are
> > bugs (==leaks) so the extra cost is basically nothing (backed by
> > benchmarks). let me re-itarate again - ANY BUFFER ALLOCATED MUST ALWAYS
> BE
> > RELEASED - so the gc pool should not rely on gc for reclaiming buffers.
> its
> > a bug detector, not a feature and is definitely not intended to hide
> bugs -
> > the exact opposite - its meant to expose them sooner. i've cleaned up the
> > docs to avoid this confusion. i also like the fail on leak. will do.
> > as for the gap between pool size and heap size - thats a valid argument.
> > may allow also sizing the pool as % of heap size? so queued.max.bytes =
> > 100 for 1MB and queued.max.bytes = 0.25 for 25% of available heap?
> >
> > jun's 2.2 - queued.max.bytes + socket.request.max.bytes still holds,
> > assuming the ssl-related buffers are small. the largest weakness in this
> > claim has to do with decompression rather than anything ssl-related. so
> yes
> > there is an O(#ssl connections * sslEngine packet size) component, but i
> > think its small. again - decompression should be the concern.
> >
> > rajini's #13 - interesting optimization. the problem is there's no
> knowing
> > in advance what the _next_ request to come out of a socket is, so this
> > would mute just those sockets that are 1. mutable and 2. have a
> > buffer-demanding request for which we could not allocate a buffer.
> downside
> > is that as-is this would cause the busy-loop on poll() that the mutes
> were
> > supposed to prevent - or code would need to be added to ad-hocmute a
> > connection that was so-far unmuted but has now generated a
> memory-demanding
> > r

[jira] [Resolved] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-2076.
-
   Resolution: Fixed
Fix Version/s: 0.10.1.0

After 
KIP-79(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090)
 the consumers are able to get the high watermark of the logs. Closing this 
ticket.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657963#comment-15657963
 ] 

Jiangjie Qin commented on KAFKA-2076:
-

[~jeffwidman] The consumer actually are able to get the HW after 
KIP-79(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090).
 
I should just close this ticket as solved. :)

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3224) Add timestamp-based log deletion policy

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657889#comment-15657889
 ] 

ASF GitHub Bot commented on KAFKA-3224:
---

Github user bill-warshaw closed the pull request at:

https://github.com/apache/kafka/pull/1972


> Add timestamp-based log deletion policy
> ---
>
> Key: KAFKA-3224
> URL: https://issues.apache.org/jira/browse/KAFKA-3224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bill Warshaw
>  Labels: kafka
>
> One of Kafka's officially-described use cases is a distributed commit log 
> (http://kafka.apache.org/documentation.html#uses_commitlog). In this case, 
> for a distributed service that needed a commit log, there would be a topic 
> with a single partition to guarantee log order. This service would use the 
> commit log to re-sync failed nodes. Kafka is generally an excellent fit for 
> such a system, but it does not expose an adequate mechanism for log cleanup 
> in such a case. With a distributed commit log, data can only be deleted when 
> the client application determines that it is no longer needed; this creates 
> completely arbitrary ranges of time and size for messages, which the existing 
> cleanup mechanisms can't handle smoothly.
> A new deletion policy based on the absolute timestamp of a message would work 
> perfectly for this case.  The client application will periodically update the 
> minimum timestamp of messages to retain, and Kafka will delete all messages 
> earlier than that timestamp using the existing log cleaner thread mechanism.
> This is based off of the work being done in KIP-32 - Add timestamps to Kafka 
> message.
> h3. Initial Approach
> https://github.com/apache/kafka/compare/trunk...bill-warshaw:KAFKA-3224



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1972: KAFKA-3224: New log deletion policy based on times...

2016-11-11 Thread bill-warshaw
Github user bill-warshaw closed the pull request at:

https://github.com/apache/kafka/pull/1972


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657841#comment-15657841
 ] 

Guozhang Wang commented on KAFKA-4322:
--

[~markshelton] Thanks for proposing this ticket. I read the PR and here are 
some meta-questions:

1. Currently we do not commit offset for the restored consumer since we store 
the restored offset in a separate checkpoint offset file, which indicate both 
from where the restoration should be starting by fetching from the changelog 
topic, but also as a flag hinting that the previous run of the instance was 
shutdown cleanly, and hence the local storage engine files (e.g. RocksDB 
folders) can be re-used. Is there any particular reasons that you want to 
commit offsets after restoration?

2. We are improving our metrics / logging mechanism as a major effort for 
better operation experience as well as debuggability. This will also include 
the number of records restored. So beyond this usage, I'm wondering if you have 
other common requests that would benefit from the additional callbacks?

If you feel that this is still a common feature that we should add to Kafka, 
could you add a KIP proposal 
(https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) 
since it contains changes to the public APIs for Kafka clients, and we can 
continue our discussion there.

BTW I have also added you to the contributor list of Apache Kafka, from now on 
you should be able to assign JIRAs to yourself.

> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Mark Shelton
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4355) StreamThread intermittently dies with "Topic not found during partition assignment" when broker restarted

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-4355:
---

Assignee: Eno Thereska  (was: Guozhang Wang)

> StreamThread intermittently dies with "Topic not found during partition 
> assignment" when broker restarted
> -
>
> Key: KAFKA-4355
> URL: https://issues.apache.org/jira/browse/KAFKA-4355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.0.0
> Environment: kafka 0.10.0.0
> kafka 0.10.1.0
> uname -a
> Linux lp02485 4.4.0-34-generic #53~14.04.1-Ubuntu SMP Wed Jul 27 16:56:40 UTC 
> 2016 x86_64 x86_64 x86_64 GNU/Linux
> java -version
> java version "1.8.0_92"
> Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
>Reporter: Michal Borowiecki
>Assignee: Eno Thereska
>
> When (a) starting kafka streams app before the broker or
> (b) restarting the broker while the streams app is running:
> the stream thread intermittently dies with "Topic not found during partition 
> assignment" StreamsException.
> This happens about between one in 5 or one in 10 times.
> Stack trace:
> {noformat}
> Exception in thread "StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Topic not found during 
> partition assignment: scheduler
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.maxNumPartitions(DefaultPartitionGrouper.java:81)
>   at 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper.partitionGroups(DefaultPartitionGrouper.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:370)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:313)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:467)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:88)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:395)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> {noformat}
> Our app has 2 streams in it, consuming from 2 different topics.
> Sometimes the exception happens on both stream threads. Sometimes only on one 
> of the stream threads.
> The exception is preceded by:
> {noformat}
> [2016-10-28 16:17:55,239] INFO [StreamThread-2] (Re-)joining group 
> pool-scheduler 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2016-10-28 16:17:55,240] INFO [StreamThread-2] Marking the coordinator 
> lp02485.openbet:1937

[jira] [Commented] (KAFKA-2076) Add an API to new consumer to allow user get high watermark of partitions.

2016-11-11 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657823#comment-15657823
 ] 

Jeff Widman commented on KAFKA-2076:


It'd be great to see this move forward.

> Add an API to new consumer to allow user get high watermark of partitions.
> --
>
> Key: KAFKA-2076
> URL: https://issues.apache.org/jira/browse/KAFKA-2076
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case that user wants to know how far it is behind a particular 
> partition on startup. Currently in each fetch response, we have high 
> watermark for each partition, we only keep a global max-lag metric. It would 
> be better that we keep a record of high watermark per partition and update it 
> on each fetch response. We can add a new API to let user query the high 
> watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1685

2016-11-11 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: Extend mirror maker test to include interceptors

--
[...truncated 14337 lines...]
org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowWhenKeyIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyWithMergedStreams STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyWithMergedStreams PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist 
STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUser STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUser PASSED

or

[jira] [Updated] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4322:
-
Assignee: Mark Shelton  (was: Guozhang Wang)

> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Mark Shelton
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2016-11-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657814#comment-15657814
 ] 

Guozhang Wang commented on KAFKA-3729:
--

[~bharatviswa] are you still working on this issue?

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Bharat Viswanadham
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk8 #1036

2016-11-11 Thread Apache Jenkins Server
See 



[jira] [Commented] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing

2016-11-11 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657795#comment-15657795
 ] 

Jiangjie Qin commented on KAFKA-4398:
-

[~huxi_2b] I am not sure I understand the issue here. So in the log the message 
is the following:
(offset=1, timestmap=T1)
(offset=2, timestamp=T3)
(offset=3, timestamp=T2.5)

In the offset index, the index entries would be:
(T1 -> 1)
(T3 -> 2)

In this case, if the consumer search for timestamp T2.5, offset 2 (i.e. T3) is 
expected to be returned because it returns the first offset of the message 
whose timestamp is greater than or equals to T2.5. This is to guarantee all the 
messages after the target timestamp would be consumed. If we return offset 3 in 
this case, message 2 whose timestamp is T3 (which is greater than T2.5) will 
not be consumed, right?

> offsetsForTimes returns false starting offset when timestamp of messages are 
> not monotonically increasing
> -
>
> Key: KAFKA-4398
> URL: https://issues.apache.org/jira/browse/KAFKA-4398
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: huxi
>Assignee: huxi
>
> After a code walk-through for KIP-33(Add a time based log index), I found a 
> use case where method 'offsetsForTimes' fails to return the correct offset if 
> a series of messages are created without the monotonically increasing 
> timestamps (CreateTime is used)
> Say T0 is the hour when the first message is created. Tn means the (T+n)th 
> hour. Then, I created another two messages at T1 and T3 respectively. At this 
> moment, the .timeindex should contain two items:
> T1 --->  1
> T3 > 2  (whether it contains T0 does not matter to this problem)
> Later, due to some reason, I want to insert a third message in between T1 and 
> T3, say T2.5, but the time index file got no changed because of the limit 
> that timestamp should be monotonically increasing for each segment.
> After generating message with T2.5, I invoke 
> KafkaConsumer.offsetsForTimes("tp" -> T2.5), hoping to get the first offset 
> with timestamp greater or equal to T2.5 which should be the third message in 
> this case, but consumer returns the second message with T3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4081) Consumer API consumer new interface commitSyn does not verify the validity of offset

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657766#comment-15657766
 ] 

ASF GitHub Bot commented on KAFKA-4081:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1827


> Consumer API consumer new interface commitSyn does not verify the validity of 
> offset
> 
>
> Key: KAFKA-4081
> URL: https://issues.apache.org/jira/browse/KAFKA-4081
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: lifeng
>Assignee: Mickael Maison
> Fix For: 0.10.2.0
>
>
> Consumer API consumer new interface commitSyn synchronization update offset, 
> for the illegal offset successful return, illegal offset<0 or offset>hw



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1827: KAFKA-4081: Consumer API consumer new interface co...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1827


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4081) Consumer API consumer new interface commitSyn does not verify the validity of offset

2016-11-11 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-4081.

   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 1827
[https://github.com/apache/kafka/pull/1827]

> Consumer API consumer new interface commitSyn does not verify the validity of 
> offset
> 
>
> Key: KAFKA-4081
> URL: https://issues.apache.org/jira/browse/KAFKA-4081
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: lifeng
>Assignee: Mickael Maison
> Fix For: 0.10.2.0
>
>
> Consumer API consumer new interface commitSyn synchronization update offset, 
> for the illegal offset successful return, illegal offset<0 or offset>hw



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3994) Deadlock between consumer heartbeat expiration and offset commit.

2016-11-11 Thread mjuarez (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657721#comment-15657721
 ] 

mjuarez commented on KAFKA-3994:


Just tried this patch against the stock 0.10.1.0 branch, and we're still seeing 
this deadlock in our staging clusters.  We don't have yet a thread dump while 
the deadlock was happening, but we'll be sure to add it to this ticket as soon 
as we get it.

Relevant mailing list thread here: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3cCALaekbxEs4GMp3Qvc4PJPHeanCtq4RSf1HZE7qf6OV=_jjd...@mail.gmail.com%3e

> Deadlock between consumer heartbeat expiration and offset commit.
> -
>
> Key: KAFKA-3994
> URL: https://issues.apache.org/jira/browse/KAFKA-3994
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.1
>
>
> I got the following stacktraces from ConsumerBounceTest
> {code}
> ...
> "Test worker" #12 prio=5 os_prio=0 tid=0x7fbb28b7f000 nid=0x427c runnable 
> [0x7fbb06445000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0003d48bcbc0> (a sun.nio.ch.Util$2)
> - locked <0x0003d48bcbb0> (a 
> java.util.Collections$UnmodifiableSet)
> - locked <0x0003d48bbd28> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:411)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1054)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:103)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:3

[GitHub] kafka pull request #2103: KAFKA-4379: Remove caching of dirty and removed ke...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2103


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4379) Remove caching of dirty and removed keys from StoreChangeLogger

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657717#comment-15657717
 ] 

ASF GitHub Bot commented on KAFKA-4379:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2103


> Remove caching of dirty and removed keys from StoreChangeLogger
> ---
>
> Key: KAFKA-4379
> URL: https://issues.apache.org/jira/browse/KAFKA-4379
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The StoreChangeLogger currently keeps a cache of dirty and removed keys and 
> will batch the changelog records such that we don't send a record for each 
> update. However, with KIP-63 this is unnecessary as the batching and 
> de-duping is done by the caching layer. Further, the StoreChangeLogger relies 
> on context.timestamp() which is likely to be incorrect when caching is enabled



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4379) Remove caching of dirty and removed keys from StoreChangeLogger

2016-11-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4379:
-
   Resolution: Fixed
Fix Version/s: (was: 0.10.1.1)
   0.10.2.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2103
[https://github.com/apache/kafka/pull/2103]

> Remove caching of dirty and removed keys from StoreChangeLogger
> ---
>
> Key: KAFKA-4379
> URL: https://issues.apache.org/jira/browse/KAFKA-4379
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> The StoreChangeLogger currently keeps a cache of dirty and removed keys and 
> will batch the changelog records such that we don't send a record for each 
> update. However, with KIP-63 this is unnecessary as the batching and 
> de-duping is done by the caching layer. Further, the StoreChangeLogger relies 
> on context.timestamp() which is likely to be incorrect when caching is enabled



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-11 Thread Becket Qin
Hey Michael,

The way Kafka implements backwards compatibility is to let the brokers
support old protocols. So the brokers have to support older clients that do
not understand the new attribute bit. That means we will not be able to get
away with the null value as a tombstone. So we need a good definition of
whether we should treat it as a tombstone or not. This is why I think we
need a magic value bump.

My confusion on the second stage is exactly about "we want to end up just
supporting tombstone marker (not both)", or in the original statement: "2)
In second stage we move on to supporting only the attribute flag for log
compaction." - We will always support the null value as a tombstone as long
as the message format version (i.e. magic byte) is less than 2 for the
reason I mentioned above.

Although the way to interpret the message bytes at producing time can be
inferred from the the ProduceRequest version, this version information
won't be stored in the broker. So when an old consumer comes, we need to
decide whether we have to do down conversion or not.. With a clear existing
configuration of message version config (i.e. magic value), we know for
sure when to down convert the messages to adapt to older clients. Otherwise
we will have to always scan all the messages. It would probably work but
relies on guess or inference.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 11, 2016 at 8:42 AM, Mayuresh Gharat  wrote:

> Sounds good Michael.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce 
> wrote:
>
> > @Mayuresh i don't think you've missed anything -
> >
> > as per earlier in the discussion.
> >
> > We're providing new api versions, but not planning on bumping the magic
> > number as there is no structural changes, we are simply using up a new
> > attribute bit (as like adding new compression support just uses up
> > additional attribute bits)
> >
> >
> > I think also difference between null vs 0 length byte array is covered.
> >
> > @Becket,
> >
> > The two stage approach is because we want to end up just supporting
> > tombstone marker (not both)
> >
> > But we accept we need to allow organisations and systems a period of
> > transition (this is what stage 1 provides)
> >
> >
> >
> >
> > 
> > From: Mayuresh Gharat 
> > Sent: Thursday, November 10, 2016 8:57 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
> >
> > I am not sure if we are bumping magicByte value as the KIP does not
> mention
> > it (If I didn't miss anything).
> >
> > @Nacho : I did not understand what you meant by : Conversion from one to
> > the other may be possible but I would rather not have to do it.
> >
> > You will have to do the conversion for the scenario that Becket has
> > mentioned above where an old consumer talks to the new broker, right?
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Thu, Nov 10, 2016 at 11:54 AM, Becket Qin 
> wrote:
> >
> > > Nacho,
> > >
> > > In Kafka protocol, a negative length is null, a zero length means empty
> > > byte array.
> > >
> > > I am still confused about the two stages. It seems that the broker only
> > > needs to understand three versions of messages.
> > > 1. MagicValue=0 - no timestamp, absolute offset, no tombstone flag
> > > 2. MagicValue=1 - with timestamp, relative offsets, no tombstone flag
> > (null
> > > value = tombstone)
> > > 3. MagicValue=2 - with timestamp, relative offsets, tombstone flag
> > >
> > > We are talking about two flavors for 3:
> > > 3.1 tombstone flag set = tombstone (Allows a key with null value in the
> > > compacted topics)
> > > 3.2 tombstone flag set OR null value = tombstone ( Do not allow a key
> > with
> > > null value in the compacted topics)
> > >
> > > No matter which flavor we choose, we just need to stick to that way of
> > > interpretation, right? Why would we need a second stage?
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Nov 10, 2016 at 10:37 AM, Ignacio Solis 
> wrote:
> > >
> > > > A quick differentiation I would like to make is null is not the same
> as
> > > > size 0.
> > > >
> > > > Many times these are made equal, but they're not.  When serializing
> > data,
> > > > we have to make a choice in null values and many times these are
> > > translated
> > > > to zero length blobs. This is not really the same thing.
> > > >
> > > > From this perspective, what can Kafka represent and what is
> considered
> > a
> > > > valid value?
> > > >
> > > > If the user sends a byte array of length 0, or if the serializer
> sends
> > > > something of length 0, this should be a valid value. It is not
> kafka's
> > > job
> > > > to determine what the user is trying to send. For all we know, the
> user
> > > has
> > > > a really good compression serializer that sends 0 bytes when nothing
> > has
> > > > changed.
> > > >
> > > > If the user is allowed to send null then some behavior should be
> > defined.
> > > > However, this should 

Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread Jay Kreps
Hey Radai,

+1 on deprecating and eventually removing the old config. The intention was
absolutely bounding memory usage. I think having two ways of doing this,
one that gives a crisp bound on memory and one that is hard to reason about
is pretty confusing. I think people will really appreciate having one
config which instead lets them directly control the thing they actually
care about (memory).

I also want to second Jun's concern on the complexity of the self-GCing
memory pool. I wrote the memory pool for the producer. In that area the
pooling of messages is the single biggest factor in performance of the
client so I believed it was worth some sophistication/complexity if there
was performance payoff. All the same, the complexity of that code has made
it VERY hard to keep correct (it gets broken roughly every other time
someone makes a change). Over time I came to feel a lot less proud of my
cleverness. I learned something interesting reading your self-GCing memory
pool, but I wonder if the complexity is worth the payoff in this case?

Philosophically we've tried really hard to avoid needlessly "pluggable"
implementations. That is, when there is a temptation to give a config that
plugs in different Java classes at run time for implementation choices, we
should instead think of how to give the user the good behavior
automatically. I think the use case for configuring a the GCing pool would
be if you discovered a bug in which memory leaked. But this isn't something
the user should have to think about right? If there is a bug we should find
and fix it.

-Jay

On Fri, Nov 11, 2016 at 9:21 AM, radai  wrote:

> jun's #1 + rajini's #11 - the new config param is to enable changing the
> pool implentation class. as i said in my response to jun i will make the
> default pool impl be the simple one, and this param is to allow a user
> (more likely a dev) to change it.
> both the simple pool and the "gc pool" make basically just an
> AtomicLong.get() + (hashmap.put for gc) calls before returning a buffer.
> there is absolutely no dependency on GC times in allocating (or not). the
> extra background thread in the gc pool is forever asleep unless there are
> bugs (==leaks) so the extra cost is basically nothing (backed by
> benchmarks). let me re-itarate again - ANY BUFFER ALLOCATED MUST ALWAYS BE
> RELEASED - so the gc pool should not rely on gc for reclaiming buffers. its
> a bug detector, not a feature and is definitely not intended to hide bugs -
> the exact opposite - its meant to expose them sooner. i've cleaned up the
> docs to avoid this confusion. i also like the fail on leak. will do.
> as for the gap between pool size and heap size - thats a valid argument.
> may allow also sizing the pool as % of heap size? so queued.max.bytes =
> 100 for 1MB and queued.max.bytes = 0.25 for 25% of available heap?
>
> jun's 2.2 - queued.max.bytes + socket.request.max.bytes still holds,
> assuming the ssl-related buffers are small. the largest weakness in this
> claim has to do with decompression rather than anything ssl-related. so yes
> there is an O(#ssl connections * sslEngine packet size) component, but i
> think its small. again - decompression should be the concern.
>
> rajini's #13 - interesting optimization. the problem is there's no knowing
> in advance what the _next_ request to come out of a socket is, so this
> would mute just those sockets that are 1. mutable and 2. have a
> buffer-demanding request for which we could not allocate a buffer. downside
> is that as-is this would cause the busy-loop on poll() that the mutes were
> supposed to prevent - or code would need to be added to ad-hocmute a
> connection that was so-far unmuted but has now generated a memory-demanding
> request?
>
>
>
> On Fri, Nov 11, 2016 at 5:02 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Radai,
> >
> > 11. The KIP talks about a new server configuration parameter
> > *memory.pool.class.name
> >  *which is not in the implementation. Is
> it
> > still the case that the pool will be configurable?
> >
> > 12. Personally I would prefer not to have a garbage collected pool that
> > hides bugs as well. Apart from the added code complexity and extra thread
> > to handle collections, I am also concerned about the non-deterministic
> > nature of GC timings. The KIP introduces delays in processing requests
> > based on the configuration parameter *queued.max.bytes. *This in
> unrelated
> > to the JVM heap size and hence pool can be full when there is no pressure
> > on the JVM to garbage collect. The KIP does not prevent other timeouts in
> > the broker (eg. consumer session timeout) because it is relying on the
> pool
> > to be managed in a deterministic, timely manner. Since a garbage
> collected
> > pool cannot provide that guarantee, wouldn't it be better to run tests
> with
> > a GC-pool that perhaps fails with a fatal error if it encounters a buffer
> > that was not released?
> >
> > 13

Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread radai
jun's #1 + rajini's #11 - the new config param is to enable changing the
pool implentation class. as i said in my response to jun i will make the
default pool impl be the simple one, and this param is to allow a user
(more likely a dev) to change it.
both the simple pool and the "gc pool" make basically just an
AtomicLong.get() + (hashmap.put for gc) calls before returning a buffer.
there is absolutely no dependency on GC times in allocating (or not). the
extra background thread in the gc pool is forever asleep unless there are
bugs (==leaks) so the extra cost is basically nothing (backed by
benchmarks). let me re-itarate again - ANY BUFFER ALLOCATED MUST ALWAYS BE
RELEASED - so the gc pool should not rely on gc for reclaiming buffers. its
a bug detector, not a feature and is definitely not intended to hide bugs -
the exact opposite - its meant to expose them sooner. i've cleaned up the
docs to avoid this confusion. i also like the fail on leak. will do.
as for the gap between pool size and heap size - thats a valid argument.
may allow also sizing the pool as % of heap size? so queued.max.bytes =
100 for 1MB and queued.max.bytes = 0.25 for 25% of available heap?

jun's 2.2 - queued.max.bytes + socket.request.max.bytes still holds,
assuming the ssl-related buffers are small. the largest weakness in this
claim has to do with decompression rather than anything ssl-related. so yes
there is an O(#ssl connections * sslEngine packet size) component, but i
think its small. again - decompression should be the concern.

rajini's #13 - interesting optimization. the problem is there's no knowing
in advance what the _next_ request to come out of a socket is, so this
would mute just those sockets that are 1. mutable and 2. have a
buffer-demanding request for which we could not allocate a buffer. downside
is that as-is this would cause the busy-loop on poll() that the mutes were
supposed to prevent - or code would need to be added to ad-hocmute a
connection that was so-far unmuted but has now generated a memory-demanding
request?



On Fri, Nov 11, 2016 at 5:02 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Radai,
>
> 11. The KIP talks about a new server configuration parameter
> *memory.pool.class.name
>  *which is not in the implementation. Is it
> still the case that the pool will be configurable?
>
> 12. Personally I would prefer not to have a garbage collected pool that
> hides bugs as well. Apart from the added code complexity and extra thread
> to handle collections, I am also concerned about the non-deterministic
> nature of GC timings. The KIP introduces delays in processing requests
> based on the configuration parameter *queued.max.bytes. *This in unrelated
> to the JVM heap size and hence pool can be full when there is no pressure
> on the JVM to garbage collect. The KIP does not prevent other timeouts in
> the broker (eg. consumer session timeout) because it is relying on the pool
> to be managed in a deterministic, timely manner. Since a garbage collected
> pool cannot provide that guarantee, wouldn't it be better to run tests with
> a GC-pool that perhaps fails with a fatal error if it encounters a buffer
> that was not released?
>
> 13. The implementation currently mutes all channels that don't have a
> receive buffer allocated. Would it make sense to mute only the channels
> that need a buffer (i.e. allow channels to read the 4-byte size that is not
> read using the pool) so that normal client connection close() is handled
> even when the pool is full? Since the extra 4-bytes may already be
> allocated for some connections, the total request memory has to take into
> account *4*numConnections* bytes anyway.
>
>
> On Thu, Nov 10, 2016 at 11:51 PM, Jun Rao  wrote:
>
> > Hi, Radai,
> >
> > 1. Yes, I am concerned about the trickiness of having to deal with wreak
> > refs. I think it's simpler to just have the simple version instrumented
> > with enough debug/trace logging and do enough stress testing. Since we
> > still have queued.max.requests, one can always fall back to that if a
> > memory leak issue is identified. We could also label the feature as beta
> if
> > we don't think this is production ready.
> >
> > 2.2 I am just wondering after we fix that issue whether the claim that
> the
> > request memory is bounded by  queued.max.bytes + socket.request.max.bytes
> > is still true.
> >
> > 5. Ok, leaving the default as -1 is fine then.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 9, 2016 at 6:01 PM, radai 
> wrote:
> >
> > > Hi Jun,
> > >
> > > Thank you for taking the time to review this.
> > >
> > > 1. short version - yes, the concern is bugs, but the cost is tiny and
> > worth
> > > it, and its a common pattern. long version:
> > >1.1 detecting these types of bugs (leaks) cannot be easily done with
> > > simple testing, but requires stress/stability tests that run for a long
> > > time (long enough to hit OOM, depending on leak size and available

[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2016-11-11 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657561#comment-15657561
 ] 

Chris Riccomini commented on KAFKA-4107:


Agreed. We've had this need as well, and have been resorting to renaming the 
connector every time. It creates a ton of garbage in the commit topic, though.



> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Ewen Cheslack-Postava
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2081: MINOR: Extend mirror maker test to include interce...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2081


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-11 Thread Mayuresh Gharat
Sounds good Michael.

Thanks,

Mayuresh

On Fri, Nov 11, 2016 at 12:48 AM, Michael Pearce 
wrote:

> @Mayuresh i don't think you've missed anything -
>
> as per earlier in the discussion.
>
> We're providing new api versions, but not planning on bumping the magic
> number as there is no structural changes, we are simply using up a new
> attribute bit (as like adding new compression support just uses up
> additional attribute bits)
>
>
> I think also difference between null vs 0 length byte array is covered.
>
> @Becket,
>
> The two stage approach is because we want to end up just supporting
> tombstone marker (not both)
>
> But we accept we need to allow organisations and systems a period of
> transition (this is what stage 1 provides)
>
>
>
>
> 
> From: Mayuresh Gharat 
> Sent: Thursday, November 10, 2016 8:57 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> I am not sure if we are bumping magicByte value as the KIP does not mention
> it (If I didn't miss anything).
>
> @Nacho : I did not understand what you meant by : Conversion from one to
> the other may be possible but I would rather not have to do it.
>
> You will have to do the conversion for the scenario that Becket has
> mentioned above where an old consumer talks to the new broker, right?
>
> Thanks,
>
> Mayuresh
>
>
> On Thu, Nov 10, 2016 at 11:54 AM, Becket Qin  wrote:
>
> > Nacho,
> >
> > In Kafka protocol, a negative length is null, a zero length means empty
> > byte array.
> >
> > I am still confused about the two stages. It seems that the broker only
> > needs to understand three versions of messages.
> > 1. MagicValue=0 - no timestamp, absolute offset, no tombstone flag
> > 2. MagicValue=1 - with timestamp, relative offsets, no tombstone flag
> (null
> > value = tombstone)
> > 3. MagicValue=2 - with timestamp, relative offsets, tombstone flag
> >
> > We are talking about two flavors for 3:
> > 3.1 tombstone flag set = tombstone (Allows a key with null value in the
> > compacted topics)
> > 3.2 tombstone flag set OR null value = tombstone ( Do not allow a key
> with
> > null value in the compacted topics)
> >
> > No matter which flavor we choose, we just need to stick to that way of
> > interpretation, right? Why would we need a second stage?
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Nov 10, 2016 at 10:37 AM, Ignacio Solis  wrote:
> >
> > > A quick differentiation I would like to make is null is not the same as
> > > size 0.
> > >
> > > Many times these are made equal, but they're not.  When serializing
> data,
> > > we have to make a choice in null values and many times these are
> > translated
> > > to zero length blobs. This is not really the same thing.
> > >
> > > From this perspective, what can Kafka represent and what is considered
> a
> > > valid value?
> > >
> > > If the user sends a byte array of length 0, or if the serializer sends
> > > something of length 0, this should be a valid value. It is not kafka's
> > job
> > > to determine what the user is trying to send. For all we know, the user
> > has
> > > a really good compression serializer that sends 0 bytes when nothing
> has
> > > changed.
> > >
> > > If the user is allowed to send null then some behavior should be
> defined.
> > > However, this should semantically be different than sending a command.
> It
> > > is possible for a null value could signify some form of delete, like
> > > "delete all messages with this key". However, if kafka has a goal to
> > write
> > > good, readable code, then this should not be allowed.
> > >
> > > A delete or a purge is a call that can have certain side effects or
> > > encounter errors that are unrelated to a send call.
> > >
> > > A couple of bullets from the Kafka style guide (
> > > http://kafka.apache.org/coding-guide.html ):
> > >
> > > - Clear code is preferable to comments. When possible make your naming
> so
> > > good you don't need comments.
> > > - Logging, configuration, and public APIs are our "UI". Make them
> pretty,
> > > consistent, and usable.
> > >
> > > If you want sendTombstone() functionality make the protocol reflect
> that.
> > >
> > > Right now we're struggling because we did not have a clear separation
> of
> > > concerns.
> > >
> > > I don't have a good solution for deployment. I'm in favor of a harsh
> > road:
> > > - Add the flag/variable/header for tombstone
> > > - Add an API for tombstone behavior if needed
> > > - Error if somebody tries to API send null
> > > - Accept if somebody tries to API send byte[] length 0
> > > - Rev broker version
> > > - broker accept flag/variable/header as tombstone
> > > - broker accept zero length values as normal messages
> > >
> > > ​The more gentle road would be:
> > > - Add the flag/variable/header for tombstone
> > > - Add an API for tombstone behavior if needed
> > > - warn/deprecate/etc if somebody sends null
> > > - broker accepts flags/variable/headers and zero length values

Build failed in Jenkins: kafka-0.10.1-jdk7 #85

2016-11-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: failing to close this iterator causes leaks in rocksdb

--
[...truncated 12159 lines...]
org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testSet STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testSet PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testClear STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testClear PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testRemove STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testRemove PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testMoveToEnd STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testMoveToEnd PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testUpdateAndMoveToEnd 
STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testUpdateAndMoveToEnd 
PASSED

org.apache.kafka.common.internals.PartitionStatesTest > testPartitionValues 
STARTED

org.apache.kafka.common.internals.PartitionStatesTest > testPartitionValues 
PASSED

org.apache.kafka.common.utils.UtilsTest > testAbs STARTED

org.apache.kafka.common.utils.UtilsTest > testAbs PASSED

org.apache.kafka.common.utils.UtilsTest > testMin STARTED

org.apache.kafka.common.utils.UtilsTest > testMin PASSED

org.apache.kafka.common.utils.UtilsTest > testJoin STARTED

org.apache.kafka.common.utils.UtilsTest > testJoin PASSED

org.apache.kafka.common.utils.UtilsTest > testReadBytes STARTED

org.apache.kafka.common.utils.UtilsTest > testReadBytes PASSED

org.apache.kafka.common.utils.UtilsTest > testGetHost STARTED

org.apache.kafka.common.utils.UtilsTest > testGetHost PASSED

org.apache.kafka.common.utils.UtilsTest > testGetPort STARTED

org.apache.kafka.common.utils.UtilsTest > testGetPort PASSED

org.apache.kafka.common.utils.UtilsTest > testCloseAll STARTED

org.apache.kafka.common.utils.UtilsTest > testCloseAll PASSED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress STARTED

org.apache.kafka.common.utils.UtilsTest > testFormatAddress PASSED

org.apache.kafka.common.utils.CrcTest > testUpdateInt STARTED

org.apache.kafka.common.utils.CrcTest > testUpdateInt PASSED

org.apache.kafka.common.utils.CrcTest > testUpdate STARTED

org.apache.kafka.common.utils.CrcTest > testUpdate PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testEmptyIterator PASSED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator STARTED

org.apache.kafka.common.utils.AbstractIteratorTest > testIterator PASSED
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

  ^


Build failed in Jenkins: kafka-trunk-jdk7 #1684

2016-11-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: failing to close this iterator causes leaks in rocksdb

--
[...truncated 14337 lines...]
org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowWhenKeyIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyWithMergedStreams STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyWithMergedStreams PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldNotThrowNPEWhenOnChangeNotCalled PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNotAvailableWhenClusterIsEmpty PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist 
STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldReturnNullOnGetWithKeyWhenStoreDoesntExist PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStreamPartitionerIsNull PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldGetInstanceWithKeyAndCustomPartitioner PASSED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore STARTED

org.apache.kafka.streams.processor.internals.StreamsMetadataStateTest > 
shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHaveCompactionPropSetIfSupplied PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldThrowIfNameIsNull PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldNotBeCompactedWhenCleanupPolicyIsDelete PASSED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUser STARTED

org.apache.kafka.streams.processor.internals.InternalTopicConfigTest > 
shouldHavePropertiesSuppliedByUs

Build failed in Jenkins: kafka-trunk-jdk8 #1035

2016-11-11 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: failing to close this iterator causes leaks in rocksdb

--
[...truncated 3872 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOf

[jira] [Commented] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing

2016-11-11 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657350#comment-15657350
 ] 

huxi commented on KAFKA-4398:
-

[~becket_qin] Does it make any sense?

> offsetsForTimes returns false starting offset when timestamp of messages are 
> not monotonically increasing
> -
>
> Key: KAFKA-4398
> URL: https://issues.apache.org/jira/browse/KAFKA-4398
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.0
>Reporter: huxi
>Assignee: huxi
>
> After a code walk-through for KIP-33(Add a time based log index), I found a 
> use case where method 'offsetsForTimes' fails to return the correct offset if 
> a series of messages are created without the monotonically increasing 
> timestamps (CreateTime is used)
> Say T0 is the hour when the first message is created. Tn means the (T+n)th 
> hour. Then, I created another two messages at T1 and T3 respectively. At this 
> moment, the .timeindex should contain two items:
> T1 --->  1
> T3 > 2  (whether it contains T0 does not matter to this problem)
> Later, due to some reason, I want to insert a third message in between T1 and 
> T3, say T2.5, but the time index file got no changed because of the limit 
> that timestamp should be monotonically increasing for each segment.
> After generating message with T2.5, I invoke 
> KafkaConsumer.offsetsForTimes("tp" -> T2.5), hoping to get the first offset 
> with timestamp greater or equal to T2.5 which should be the third message in 
> this case, but consumer returns the second message with T3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4398) offsetsForTimes returns false starting offset when timestamp of messages are not monotonically increasing

2016-11-11 Thread huxi (JIRA)
huxi created KAFKA-4398:
---

 Summary: offsetsForTimes returns false starting offset when 
timestamp of messages are not monotonically increasing
 Key: KAFKA-4398
 URL: https://issues.apache.org/jira/browse/KAFKA-4398
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 0.10.1.0
Reporter: huxi
Assignee: huxi


After a code walk-through for KIP-33(Add a time based log index), I found a use 
case where method 'offsetsForTimes' fails to return the correct offset if a 
series of messages are created without the monotonically increasing timestamps 
(CreateTime is used)

Say T0 is the hour when the first message is created. Tn means the (T+n)th 
hour. Then, I created another two messages at T1 and T3 respectively. At this 
moment, the .timeindex should contain two items:
T1 --->  1
T3 > 2  (whether it contains T0 does not matter to this problem)

Later, due to some reason, I want to insert a third message in between T1 and 
T3, say T2.5, but the time index file got no changed because of the limit that 
timestamp should be monotonically increasing for each segment.

After generating message with T2.5, I invoke KafkaConsumer.offsetsForTimes("tp" 
-> T2.5), hoping to get the first offset with timestamp greater or equal to 
T2.5 which should be the third message in this case, but consumer returns the 
second message with T3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3835) Streams is creating two ProducerRecords for each send via RecordCollector

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3835:

Assignee: (was: Guozhang Wang)

> Streams is creating two ProducerRecords for each send via RecordCollector
> -
>
> Key: KAFKA-3835
> URL: https://issues.apache.org/jira/browse/KAFKA-3835
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> The RecordCollector.send(..) method below, currently receives a 
> ProducerRecord from its caller and then creates another one to forward on to 
> its producer.  The creation of 2 ProducerRecords should be removed.
> {code}
> public  void send(ProducerRecord record, Serializer 
> keySerializer, Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}
> We could replace the above method with
> {code}
> public  void send(String topic,
> K key,
> V value,
> Integer partition,
> Long timestamp,
> Serializer keySerializer,
> Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4304:

Assignee: (was: Guozhang Wang)

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3262) Make KafkaStreams debugging friendly

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-3262.
-
Resolution: Duplicate

> Make KafkaStreams debugging friendly
> 
>
> Key: KAFKA-3262
> URL: https://issues.apache.org/jira/browse/KAFKA-3262
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Eno Thereska
>  Labels: user-experience
> Fix For: 0.10.2.0
>
>
> Current KafkaStreams polls records in the same thread as the data processing 
> thread. This makes debugging user code, as well as KafkaStreams itself, 
> difficult. When the thread is suspended by the debugger, the next heartbeat 
> of the consumer tie to the thread won't be send until the thread is resumed. 
> This often results in missed heartbeats and causes a group rebalance. So it 
> may will be a completely different context then the thread hits the break 
> point the next time.
> We should consider using separate threads for polling and processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3534) Deserialize on demand when default time extractor used

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3534:

Assignee: (was: Guozhang Wang)

> Deserialize on demand when default time extractor used
> --
>
> Key: KAFKA-3534
> URL: https://issues.apache.org/jira/browse/KAFKA-3534
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.9.0.1
>Reporter: Michael Coon
>Priority: Minor
>  Labels: performance
>
> When records are added to the RecordQueue, they are deserialized at that time 
> in order to extract the timestamp. But for some data flows where large 
> messages are consumed (particularly compressed messages), this can result in 
> large spikes in memory as all messages must be deserialized prior to 
> processing (and getting out of memory). An optimization might be to only 
> require deserialization at this stage if a non-default timestamp extractor is 
> being used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3625:

Assignee: (was: Guozhang Wang)

> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Priority: Minor
>  Labels: user-experience
> Fix For: 0.10.2.0
>
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3835) Streams is creating two ProducerRecords for each send via RecordCollector

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3835:

Labels: newbie  (was: )

> Streams is creating two ProducerRecords for each send via RecordCollector
> -
>
> Key: KAFKA-3835
> URL: https://issues.apache.org/jira/browse/KAFKA-3835
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.2.0
>
>
> The RecordCollector.send(..) method below, currently receives a 
> ProducerRecord from its caller and then creates another one to forward on to 
> its producer.  The creation of 2 ProducerRecords should be removed.
> {code}
> public  void send(ProducerRecord record, Serializer 
> keySerializer, Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}
> We could replace the above method with
> {code}
> public  void send(String topic,
> K key,
> V value,
> Integer partition,
> Long timestamp,
> Serializer keySerializer,
> Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4199) When a window store segment is dropped we should also clear any corresponding cached entries

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4199:

Assignee: (was: Guozhang Wang)

> When a window store segment is dropped we should also clear any corresponding 
> cached entries
> 
>
> Key: KAFKA-4199
> URL: https://issues.apache.org/jira/browse/KAFKA-4199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Priority: Minor
>
> In KIP-63 we introduced a CachingWindowStore, but it currently doesn't have a 
> way to be informed when the underlying store drops a segment. In an ideal 
> world, when a segment is dropped we'd also remove the corresponding entries 
> from the cache. 
> Firstly, we need to understand if it is an issue if they don't get dropped. 
> They will naturally be evicted when the cache becomes full, but this could 
> impact other stores in the thread. i.e., what if any performance impact 
> exists?
> If we find there is an unacceptable performance penalty we might need to add 
> a callback to the WindowStore API such that we can be notified when segments 
> are removed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3262) Make KafkaStreams debugging friendly

2016-11-11 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657242#comment-15657242
 ] 

Eno Thereska commented on KAFKA-3262:
-

This is now fixed with KIP-62

> Make KafkaStreams debugging friendly
> 
>
> Key: KAFKA-3262
> URL: https://issues.apache.org/jira/browse/KAFKA-3262
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Eno Thereska
>  Labels: user-experience
> Fix For: 0.10.2.0
>
>
> Current KafkaStreams polls records in the same thread as the data processing 
> thread. This makes debugging user code, as well as KafkaStreams itself, 
> difficult. When the thread is suspended by the debugger, the next heartbeat 
> of the consumer tie to the thread won't be send until the thread is resumed. 
> This often results in missed heartbeats and causes a group rebalance. So it 
> may will be a completely different context then the thread hits the break 
> point the next time.
> We should consider using separate threads for polling and processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3705) Support non-key joining in KTable

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3705:

Assignee: (was: Liquan Pei)

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
> Fix For: 0.10.2.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3739) Add no-arg constructor for library provided serdes

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-3739:

Assignee: (was: Liquan Pei)

> Add no-arg constructor for library provided serdes
> --
>
> Key: KAFKA-3739
> URL: https://issues.apache.org/jira/browse/KAFKA-3739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: newbie, user-experience
>
> We need to add the no-arg constructor explicitly for those library-provided 
> serdes such as {{WindowedSerde}} that already have constructors with 
> arguments. Otherwise they cannot be used through configs which are expecting 
> to construct them via reflections with no-arg constructors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2122: failing to close this iterator causes leaks in roc...

2016-11-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2122


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-11 Thread Rajini Sivaram
I think all the comments and suggestions on this thread have now been
incorporated into the KIP. If there are no objections, I will start the
voting process on Monday.

Regards,

Rajini

On Tue, Nov 8, 2016 at 9:20 PM, Rajini Sivaram  wrote:

> Jun,
>
> Have added a sub-section on delegation token support to the KIP.
>
> Thank you,
>
> Rajini
>
> On Tue, Nov 8, 2016 at 8:07 PM, Jun Rao  wrote:
>
>> Hi, Rajini,
>>
>> That makes sense. Could you document this potential future extension in
>> the
>> KIP?
>>
>> Jun
>>
>> On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
>> rajinisiva...@googlemail.com> wrote:
>>
>> > Jun,
>> >
>> > 11. SCRAM messages have an optional extensions field which is a list of
>> > key=value pairs. We can add an extension key to the first client
>> message to
>> > indicate delegation token. Broker can then obtain credentials and
>> principal
>> > using a different code path for delegation tokens.
>> >
>> > On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:
>> >
>> > > Magnus,
>> > >
>> > > Thanks for the input. If you don't feel strongly the need to bump up
>> the
>> > > version of SaslHandshake, we can leave the version unchanged.
>> > >
>> > > Rajini,
>> > >
>> > > 11. Yes, we could send the HMAC as the SCRAM password for the
>> delegation
>> > > token. Do we need something to indicate that this SCRAM token is
>> special
>> > > (i.e., delegation token) so that we can generate the correct
>> > > KafkaPrincipal? The delegation token logic can be added later. I am
>> > asking
>> > > just so that we have enough in the design of SCRAM to add the
>> delegation
>> > > token logic later.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
>> > > rajinisiva...@googlemail.com
>> > > > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > 10. *s=* and *i=* come from the SCRAM standard
>> (they
>> > > are
>> > > > transferred during SCRAM auth). Scram messages look like (for
>> example)
>> > > > *r=,s=,i=*. StoredKey and ServerKey and
>> not
>> > > > transferred in SCRAM messages, so I picked two keys that are unused
>> in
>> > > > SCRAM.
>> > > >
>> > > > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password
>> for
>> > > > authentication along with a username and an optional
>> authorization-id.
>> > > > Kafka uses the username as the identity (Kafka principal) for
>> > > > authentication and authorization. KIP-48 doesn't mention
>> KafkaPrincipal
>> > > in
>> > > > the section "Authentication using Token", but a delegation token is
>> > > > associated with a Kafka principal. Since delegation tokens are
>> acquired
>> > > on
>> > > > behalf of a KafkaPrincipal and the principal is included in the
>> token
>> > as
>> > > > the token owner,  clients authenticating with delegation tokens
>> could
>> > use
>> > > > the token owner as username and the token HMAC as shared
>> > secret/password.
>> > > >
>> > > > If necessary, any other form of token identifier may be used as
>> > username
>> > > as
>> > > > well as long as it contains sufficient information for the broker to
>> > > > retrieve/compute the principal and HMAC for authentication. The
>> server
>> > > > callback handler can be updated when delegation tokens are
>> implemented
>> > to
>> > > > generate Kafka principal accordingly.
>> > > >
>> > > >
>> > > > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
>> > > >
>> > > > > Hi, Rajini,
>> > > > >
>> > > > > A couple of other questions on the KIP.
>> > > > >
>> > > > > 10. For the config values stored in ZK, are those keys (s, t, k,
>> i,
>> > > etc)
>> > > > > stored under scram-sha-256 standard?
>> > > > >
>> > > > > 11. Could KIP-48 (delegation token) use this KIP to send
>> delegation
>> > > > tokens?
>> > > > > In KIP-48, the client sends a HMAC as the delegation token to the
>> > > server.
>> > > > > Not sure how this gets mapped to the username/password in this
>> KIP.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
>> > > > > rajinisiva...@googlemail.com
>> > > > > > wrote:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256
>> > SASL
>> > > > > > mechanisms to Kafka:
>> > > > > >
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 84%3A+Support+SASL+SCRAM+mechanisms
>> > > > > >
>> > > > > >
>> > > > > > Comments and suggestions are welcome.
>> > > > > >
>> > > > > > Thank you...
>> > > > > >
>> > > > > > Regards,
>> > > > > >
>> > > > > > Rajini
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Regards,
>> > > >
>> > > > Rajini
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Regards,
>> >
>> > Rajini
>> >
>>
>
>
>
> --
> Regards,
>
> Rajini
>



-- 
Regards,

Rajini


[jira] [Resolved] (KAFKA-4181) Processors punctuate() methods call multiple times

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-4181.
-
Resolution: Not A Problem

> Processors punctuate() methods call multiple times
> --
>
> Key: KAFKA-4181
> URL: https://issues.apache.org/jira/browse/KAFKA-4181
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1
>Reporter: James Clinton
>Assignee: Guozhang Wang
>
> I'm seeing odd behaviour whereby a Processor's punctuate(..) method is not 
> invoked based on the schedule, but builds up a backlog of pending punctuate() 
> calls that get flushed through when the Processors process() method is next 
> called.
> So for example:
> 1. Schedule the punctuate() methods to be called every second.  
> 2. A minute passes
> 3. Processor.process() receives a new message
> 4. Processor.punctuate(..) is invoked 60 times



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4144:

Assignee: (was: Guozhang Wang)

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread Rajini Sivaram
Radai,

11. The KIP talks about a new server configuration parameter
*memory.pool.class.name
 *which is not in the implementation. Is it
still the case that the pool will be configurable?

12. Personally I would prefer not to have a garbage collected pool that
hides bugs as well. Apart from the added code complexity and extra thread
to handle collections, I am also concerned about the non-deterministic
nature of GC timings. The KIP introduces delays in processing requests
based on the configuration parameter *queued.max.bytes. *This in unrelated
to the JVM heap size and hence pool can be full when there is no pressure
on the JVM to garbage collect. The KIP does not prevent other timeouts in
the broker (eg. consumer session timeout) because it is relying on the pool
to be managed in a deterministic, timely manner. Since a garbage collected
pool cannot provide that guarantee, wouldn't it be better to run tests with
a GC-pool that perhaps fails with a fatal error if it encounters a buffer
that was not released?

13. The implementation currently mutes all channels that don't have a
receive buffer allocated. Would it make sense to mute only the channels
that need a buffer (i.e. allow channels to read the 4-byte size that is not
read using the pool) so that normal client connection close() is handled
even when the pool is full? Since the extra 4-bytes may already be
allocated for some connections, the total request memory has to take into
account *4*numConnections* bytes anyway.


On Thu, Nov 10, 2016 at 11:51 PM, Jun Rao  wrote:

> Hi, Radai,
>
> 1. Yes, I am concerned about the trickiness of having to deal with wreak
> refs. I think it's simpler to just have the simple version instrumented
> with enough debug/trace logging and do enough stress testing. Since we
> still have queued.max.requests, one can always fall back to that if a
> memory leak issue is identified. We could also label the feature as beta if
> we don't think this is production ready.
>
> 2.2 I am just wondering after we fix that issue whether the claim that the
> request memory is bounded by  queued.max.bytes + socket.request.max.bytes
> is still true.
>
> 5. Ok, leaving the default as -1 is fine then.
>
> Thanks,
>
> Jun
>
> On Wed, Nov 9, 2016 at 6:01 PM, radai  wrote:
>
> > Hi Jun,
> >
> > Thank you for taking the time to review this.
> >
> > 1. short version - yes, the concern is bugs, but the cost is tiny and
> worth
> > it, and its a common pattern. long version:
> >1.1 detecting these types of bugs (leaks) cannot be easily done with
> > simple testing, but requires stress/stability tests that run for a long
> > time (long enough to hit OOM, depending on leak size and available
> memory).
> > this is why some sort of leak detector is "standard practice" .for
> example
> > look at netty (http://netty.io/wiki/reference-counted-objects.
> > html#leak-detection-levels)
> >  html#leak-detection-levels
> > >-
> > they have way more complicated built-in leak detection enabled by
> default.
> > as a concrete example - during development i did not properly dispose of
> > in-progress KafkaChannel.receive when a connection was abruptly closed
> and
> > I only found it because of the log msg printed by the pool.
> >1.2 I have a benchmark suite showing the performance cost of the gc
> pool
> > is absolutely negligible -
> > https://github.com/radai-rosenblatt/kafka-benchmarks/
> > tree/master/memorypool-benchmarks
> >1.3 as for the complexity of the impl - its just ~150 lines and pretty
> > straight forward. i think the main issue is that not many people are
> > familiar with weak refs and ref queues.
> >
> >how about making the pool impl class a config param (generally good
> > going forward), make the default be the simple pool, and keep the GC one
> as
> > a dev/debug/triage aid?
> >
> > 2. the KIP itself doesnt specifically treat SSL at all - its an
> > implementation detail. as for my current patch, it has some minimal
> > treatment of SSL - just enough to not mute SSL sockets mid-handshake -
> but
> > the code in SslTransportLayer still allocates buffers itself. it is my
> > understanding that netReadBuffer/appReadBuffer shouldn't grow beyond 2 x
> > sslEngine.getSession().getPacketBufferSize(), which i assume to be
> small.
> > they are also long lived (they live for the duration of the connection)
> > which makes a poor fit for pooling. the bigger fish to fry i think is
> > decompression - you could read a 1MB blob into a pool-provided buffer and
> > then decompress it into 10MB of heap allocated on the spot :-) also, the
> > ssl code is extremely tricky.
> >2.2 just to make sure, youre talking about Selector.java: while
> > ((networkReceive = channel.read()) != null) addToStagedReceives(channel,
> > networkReceive); ? if so youre right, and i'll fix that (probably by
> > something similar to immediatelyConnectedKeys, not sure yet)
> >
> > 3. 

[jira] [Updated] (KAFKA-4182) Move the change logger our of RocksDB stores

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4182:

Assignee: (was: Guozhang Wang)

> Move the change logger our of RocksDB stores
> 
>
> Key: KAFKA-4182
> URL: https://issues.apache.org/jira/browse/KAFKA-4182
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
> Fix For: 0.10.1.1
>
>
> We currently have the change logger embedded within the RocksDB store 
> implementations, however this results in multiple implementations of the same 
> thing and bad separation of concerns. We should create new LoggedStore that 
> wraps the outer most store when logging is enabled, for example:
> loggedStore -> cachingStore -> meteredStore -> innerStore



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4186) Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey

2016-11-11 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657022#comment-15657022
 ] 

Eno Thereska commented on KAFKA-4186:
-

I haven't seen this happen. Is it still an issue?

> Transient failure in KStreamAggregationIntegrationTest.shouldGroupByKey
> ---
>
> Key: KAFKA-4186
> URL: https://issues.apache.org/jira/browse/KAFKA-4186
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> Saw this running locally off of trunk:
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest > 
> shouldGroupByKey[1] FAILED
> java.lang.AssertionError: Condition not met within timeout 6. Did not 
> receive 10 number of records
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:268)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:211)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.receiveMessages(KStreamAggregationIntegrationTest.java:480)
> at 
> org.apache.kafka.streams.integration.KStreamAggregationIntegrationTest.shouldGroupByKey(KStreamAggregationIntegrationTest.java:407)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4218) Enable access to key in {{ValueTransformer}}

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4218:

Assignee: (was: Guozhang Wang)

> Enable access to key in {{ValueTransformer}}
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4219) Permit setting of event time in stream processor

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4219:

Assignee: (was: Guozhang Wang)

> Permit setting of event time in stream processor
> 
>
> Key: KAFKA-4219
> URL: https://issues.apache.org/jira/browse/KAFKA-4219
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>
> Event time is assigned in stream sources via {{TimestampExtractor}}.  Once 
> the event time has been assigned, it remains the same, regardless of any 
> downstream processing in the topology.  This is insufficient for many 
> processing jobs, particularly when the output of the job is written back into 
> a Kafka topic, where the record's time is encoded outside of the record's 
> value.
> For instance:
> * When performing windowed aggregations it may be desirable for the timestamp 
> of the emitted record to be lower or higher limits of the time window, rather 
> than the timestamp of the last processed element, which may be anywhere 
> within the time window.
> * When joining two streams, it is non-deterministic which of the two record's 
> timestamps will be the timestamp of the emitted record.  It would be either 
> one depending on what order the records are processed.  Even where this 
> deterministic, it may be desirable for the emitted timestamp to be altogether 
> different from the timestamp of the joined records.  For instance, setting 
> the timestamp to the current processing time may be desirable.
> * In general, lower level processors may wish to set the timestamp of emitted 
> records to an arbitrary value.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4263) QueryableStateIntegrationTest.concurrentAccess is failing occasionally in jenkins builds

2016-11-11 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657015#comment-15657015
 ] 

Eno Thereska commented on KAFKA-4263:
-

Haven't seen this. Closing for now.

> QueryableStateIntegrationTest.concurrentAccess is failing occasionally in 
> jenkins builds
> 
>
> Key: KAFKA-4263
> URL: https://issues.apache.org/jira/browse/KAFKA-4263
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> We are seeing occasional failures of this test in jenkins, however it isn't 
> failing when running locally (confirmed by multiple people). Needs 
> investingating



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4240) Remove disableLogging from API

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4240:

Assignee: Guozhang Wang  (was: Eno Thereska)

> Remove disableLogging from API
> --
>
> Key: KAFKA-4240
> URL: https://issues.apache.org/jira/browse/KAFKA-4240
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> The disableLogging API in PersistentKeyValueFactory is potentially not needed 
> since all stores should have a backing changelog for recovery. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4263) QueryableStateIntegrationTest.concurrentAccess is failing occasionally in jenkins builds

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska resolved KAFKA-4263.
-
Resolution: Not A Problem

> QueryableStateIntegrationTest.concurrentAccess is failing occasionally in 
> jenkins builds
> 
>
> Key: KAFKA-4263
> URL: https://issues.apache.org/jira/browse/KAFKA-4263
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> We are seeing occasional failures of this test in jenkins, however it isn't 
> failing when running locally (confirmed by multiple people). Needs 
> investingating



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4273:

Assignee: (was: Guozhang Wang)

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4281:

Assignee: (was: Guozhang Wang)

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4317) RocksDB checkpoint files lost on kill -9

2016-11-11 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4317:

Assignee: (was: Guozhang Wang)

> RocksDB checkpoint files lost on kill -9
> 
>
> Key: KAFKA-4317
> URL: https://issues.apache.org/jira/browse/KAFKA-4317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>
> Right now, the checkpoint files for logged RocksDB stores are written during 
> a graceful shutdown, and removed upon restoration. Unfortunately this means 
> that in a scenario where the process is forcibly killed, the checkpoint files 
> are not there, so all RocksDB stores are rematerialized from scratch on the 
> next launch.
> In a way, this is good, because it simulates bootstrapping a new node (for 
> example, its a good way to see how much I/O is used to rematerialize the 
> stores) however it leads to longer recovery times when a non-graceful 
> shutdown occurs and we want to get the job up and running again.
> It seems that two possible things to consider:
> - Simply do not remove checkpoint files on restoring. This way a kill -9 will 
> result in only repeating the restoration of all the data generated in the 
> source topics since the last graceful shutdown.
> - Continually update the checkpoint files (perhaps on commit) -- this would 
> result in the least amount of overhead/latency in restarting, but the 
> additional complexity may not be worth it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4359) Streams integration tests should not use commit interval of 1

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657007#comment-15657007
 ] 

ASF GitHub Bot commented on KAFKA-4359:
---

GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2124

KAFKA-4359: Removed commit interval



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
KAFKA-4359-intergration-tests-commit1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2124


commit eab63b569f21797bd502c520341f5d90fbc6d9db
Author: Eno Thereska 
Date:   2016-11-11T12:51:10Z

Removed commit interval




> Streams integration tests should not use commit interval of 1
> -
>
> Key: KAFKA-4359
> URL: https://issues.apache.org/jira/browse/KAFKA-4359
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.10.1.1
>
>
> Several streams integration tests use two cache sizes, 0 and 10MB. However, 
> when they use 10MB, they still use a very small commit interval (1ms). That 
> leads to two problems:1) a small commit interval often has the same effect as 
> having the cache size be 0, and 2) a small commit interval is not exactly the 
> same as the cache size being 0 and in some cases there is deduplication. This 
> leads to the tests failing, since they don't expect deduplication.
> To solve this issue, look at KStreamAggregationDedupIntegrationTest and 
> KStreamAggregationIntegrationTest. If you want to test dedup, it would be 
> necessary to create another file. 
> Several tests need this cleanup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >