Re: [DISCUSS] KIP-78: Cluster Id

2016-09-02 Thread Dong Lin
Hey Ismael,

Thanks for your reply. Please see my comment inline.

On Fri, Sep 2, 2016 at 8:28 PM, Ismael Juma  wrote:

> Hi Dong,
>
> Thanks for your feedback. Comments inline.
>
> On Thu, Sep 1, 2016 at 7:51 PM, Dong Lin  wrote:
> >
> > I share the view with Harsha and would like to understand how the current
> > approach of randomly generating cluster.id compares with the approach of
> > manually specifying it in meta.properties.
> >
>
> Harsha's suggestion in the thread was to store the generated id in
> meta.properties, not to manually specify it via meta.properties.
>
> >
> > I think one big advantage of defining it manually in zookeeper is that we
> > can easily tell which cluster it is by simply looking at the sensor name,
> > which makes it more useful to the auditing or monitoring use-case that
> this
> > KIP intends to address.
>
>
> If you really want to customise the name, it is possible with the current
> proposal: save the appropriate znode in ZooKeeper before a broker
> auto-generates it. We don't encourage that because once you have a
> meaningful name, there's a good chance that you may want to change it in
> the future. And things break down at that point. That's why we prefer
> having a generated, unique and immutable id complemented by a changeable
> human readable name. As described in the KIP, we think the latter can be
> achieved more generally via resource tags (which will be a separate KIP).
>
> Can you elaborate what will break down if we need to change the name?

Even if we can not change name because something will breakdown in that
case, it seems that it is still better to read id from config than using a
randomly generated ID. In my suggested solution user can simply choose not
to change the name and make sure there is unique id per cluster. In your
proposal you need to store the old cluster.id and manually restore it in
zookeeper in some scenarios. What do you think?


> > On the other hand, if you can only tell whether two
> > sensors are measuring the same cluster or not. Also note that even this
> > goal is not easily guaranteed, because you need an external mechanism to
> > manually re-generate znode with the old cluster.id if znode is deleted
> or
> > if the same cluster (w.r.t purpose) is changed to use a different
> > zookeeper.
> >
>
> If we assume that znodes can be deleted at random, the cluster id is
> probably the least of one's worries. And yes, when moving to a
> different ZooKeeper while wanting to retain the cluster id, you would have
> to set the znode manually. This doesn't seem too onerous compared to the
> other work you will have to do for this scenario.
>
> Maybe this work is not much compared to other work. But we can agree that
no work is better than little work, right? I am interested to see if we can
avoid the work and still meet the motivation and goals of this KIP.


> > I read your reply to Harsha but still I don't fully understand your
> concern
> > with that approach. I think the broker can simply register group.id in
> > that
> > znode if it is not specified yet, in the same way that this KIP proposes
> to
> > do it, right? Can you please elaborate more about your concern with this
> > approach?
> >
>
> It's a bit difficult to answer this comment because it seems like the
> intent of your suggestion is different than Harsha's.
>
> I am not necessarily opposed to storing the cluster id in meta.properties
> (note that we have one meta.properties per log.dir), but I think there are
> a number of things that need to be discussed and I don't think we need to
> block KIP-78 while that takes place. Delivering features incrementally is a
> good thing in my opinion (KIP-31/32, KIP-33 and KIP-79 is a good recent
> example).
>

If I understand it right, the motivation of this KIP is to allow cluster to
be uniquely identified. This is a useful feature and I am not asking for
anything beyond this scope. It is just that reading cluster.id from config
seems to be a better solution in order to meet the motivation and all the
goals described in the KIP. More specifically, using cluster.id not only
allows user to distinguish between different clusters, it also lets user
identify cluster. In comparison, randomly generated cluster.id allows user
to distinguish cluster with a little bit more effort, and doesn't allow
user to identify a cluster by simply reading e.g. sensor name. Did I miss
something here?


>
> Ismael
>
> P.S. For what is worth, the following version of the KIP includes an
> incomplete description (it assumes a single meta.properties, but there
> could be many) of what the broker would have to do if we wanted to save to
> meta.properties and potentially restore the znode from it. The state space
> becomes a lot more complex, increasing potential for bugs (we had a few for
> generated broker ids). In contrast, the current proposal is very simple and
> doesn't prevent us from introducing the additional functionality later.
>
> https://cwiki.apache.o

Re: [DISCUSS] KIP-78: Cluster Id

2016-09-02 Thread Ismael Juma
Hi Dong,

Thanks for your feedback. Comments inline.

On Thu, Sep 1, 2016 at 7:51 PM, Dong Lin  wrote:
>
> I share the view with Harsha and would like to understand how the current
> approach of randomly generating cluster.id compares with the approach of
> manually specifying it in meta.properties.
>

Harsha's suggestion in the thread was to store the generated id in
meta.properties, not to manually specify it via meta.properties.

>
> I think one big advantage of defining it manually in zookeeper is that we
> can easily tell which cluster it is by simply looking at the sensor name,
> which makes it more useful to the auditing or monitoring use-case that this
> KIP intends to address.


If you really want to customise the name, it is possible with the current
proposal: save the appropriate znode in ZooKeeper before a broker
auto-generates it. We don't encourage that because once you have a
meaningful name, there's a good chance that you may want to change it in
the future. And things break down at that point. That's why we prefer
having a generated, unique and immutable id complemented by a changeable
human readable name. As described in the KIP, we think the latter can be
achieved more generally via resource tags (which will be a separate KIP).


> On the other hand, if you can only tell whether two
> sensors are measuring the same cluster or not. Also note that even this
> goal is not easily guaranteed, because you need an external mechanism to
> manually re-generate znode with the old cluster.id if znode is deleted or
> if the same cluster (w.r.t purpose) is changed to use a different
> zookeeper.
>

If we assume that znodes can be deleted at random, the cluster id is
probably the least of one's worries. And yes, when moving to a
different ZooKeeper while wanting to retain the cluster id, you would have
to set the znode manually. This doesn't seem too onerous compared to the
other work you will have to do for this scenario.


> I read your reply to Harsha but still I don't fully understand your concern
> with that approach. I think the broker can simply register group.id in
> that
> znode if it is not specified yet, in the same way that this KIP proposes to
> do it, right? Can you please elaborate more about your concern with this
> approach?
>

It's a bit difficult to answer this comment because it seems like the
intent of your suggestion is different than Harsha's.

I am not necessarily opposed to storing the cluster id in meta.properties
(note that we have one meta.properties per log.dir), but I think there are
a number of things that need to be discussed and I don't think we need to
block KIP-78 while that takes place. Delivering features incrementally is a
good thing in my opinion (KIP-31/32, KIP-33 and KIP-79 is a good recent
example).

Ismael

P.S. For what is worth, the following version of the KIP includes an
incomplete description (it assumes a single meta.properties, but there
could be many) of what the broker would have to do if we wanted to save to
meta.properties and potentially restore the znode from it. The state space
becomes a lot more complex, increasing potential for bugs (we had a few for
generated broker ids). In contrast, the current proposal is very simple and
doesn't prevent us from introducing the additional functionality later.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868433


Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-02 Thread Becket Qin
Thanks for the feedback Jun, Jay and Ismael.

To answer Jun's question
> 1. In ListOffsetResponse, I am not sure if it's useful to return the
> timestamp since the user can always find the timestamp by fetching the
> message at the returned offset.

Returning timestamp with offset may be useful if users do not always want
to read the message afterwards. For example, user may not want to consume a
message if its timestamp is too far away from the target timestamp. Also
users may only want to perform some query based timestamp without consuming
all the messages. e.g. total number of messages in a time range.

Regarding Jay's question:
> One minor thing. I think the old v0 list offsets request also gave you the
> highwater mark, it kind of shoves it in as the last thing in the array of
> offsets. This is used internally to implement seekToEnd() iirc. How would
> that work once v0 is removed?

Because we will preserve the behavior of target time -1(latest) and
-2(earliest). The high watermark will be returned if the target time is -1
(latest) in ListOffsetRequest v1. From consumer's point of view, it will
not see the messages beyond the high watermark, so "latest" is the high
watermark. It seems that KafkaConsumer.seekToEnd() is already doing that,
so the behavior won't change.

For the batch processing use case, it seems also supported by the
ListOffsetRequest with -1 target time. Admittedly, if a batch processing
always consumes from a begin timestamp until the current high watermark,
piggy backing the high watermark in the ListOffsetResponse v1 may save one
ListOffsetRequest. ( If we don't piggy back the high watermark, user needs
to issue two ListOffsetRequests, one for begin time, the other for the high
watermark). However, it seems make the protocol less clean.

@Ismael,
I fixed the wiki you pointed out. offsetsForTimes() sounds good to me.

The currently offsetsForTimes() API obviously does not support querying
multiple timestamps for the same partition. It doesn't seems a feature for
ListOffsetRequest v0 either (sounds more like a bug). My intuition is that
it's a rare use case. Given it does not exist before and we don't see a
strong need from the community either, maybe it is better to keep it simple
for ListOffsetRequest v1. We can add it later if it turns out to be a
useful feature (that may need a interface change, but I honestly do not
think people would frequently query many different timestamps for the same
partition)

Have a good long weekend!

Thanks,

Jiangjie (Becket) Qin




On Fri, Sep 2, 2016 at 6:10 PM, Ismael Juma  wrote:

> Thanks for the proposal Becket. Looks good overall, a few comments:
>
> ListOffsetResponse => [TopicName [PartitionOffsets]]
> >   PartitionOffsets => Partition ErrorCode Timestamp [Offset]
> >   Partition => int32
> >   ErrorCode => int16
> >   Timestamp => int64
> >   Offset => int
>
>
> It should be int64 for `Offset` right?
>
> Implementation wise, we will migrate to o.a.k.common.requests.
> ListOffsetRequest
> > class on the broker side.
>
>
> Could you clarify what you mean here? We already
> use o.a.k.common.requests.ListOffsetRequest in KafkaApis.
>
> long offset = consumer.offsetForTime(Collections.singletonMap(
> topicPartition,
> > targetTime)).offset;
>
>
> The result of `offsetForTime` is a Map, so we can't just call `offset` on
> it. You probably meant something like:
>
> long offset = consumer.offsetForTime(Collections.singletonMap(
> topicPartition,
> targetTime)).get(topicPartition).offset;
>
> Test searchByTimestamp with CreateTime and LogAppendTime
> >
>
> Do you mean `Test offsetForTime`?
>
> And:
>
> 1. In KAFKA-1588, the following issue was described "When performing an
> OffsetRequest, if you request the same topic and partition combination in a
> single request more than once (for example, if you want to get both the
> head and tail offsets for a partition in the same request), you will get a
> response for both, but they will be the same offset". Will the new request
> version support the use case where multiple timestamps are passed for the
> same topic partition? And if we do support it at the protocol level, do we
> also want to support it at the API level or do we think the additional
> complexity is not worth it?
>
> 2. Is `offsetForTime` the right method name given that we are getting
> multiple offsets? Maybe it should be `offsetsForTimes` or something like
> that.
>
> Ismael
>
> On Wed, Aug 31, 2016 at 4:38 AM, Becket Qin  wrote:
>
> > Hi Kafka devs,
> >
> > I created KIP-79 to allow consumer to precisely query the offsets based
> on
> > timestamp.
> >
> > In short we propose to :
> > 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> > 2. add an offsetForTime() method in new consumer.
> >
> > The KIP wiki is the following:
> > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=65868090
> >
> > Comments are welcome.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>


Re: [DISCUSS] Remove beta label from the new Java consumer

2016-09-02 Thread Ismael Juma
Hi Jaikiran,

Thanks for your feedback. Comments inline.

On Wed, Aug 31, 2016 at 5:40 AM, Jaikiran Pai 
wrote:

> Personally, I would be OK if the beta label is removed from it if the dev
> team is sure the API isn't going to change. I don't know if that's true or
> not post 0.10.0.1.


The API has been declared stable as of 0.10.0.0. It's worth mentioning that
backwards compatible API changes can still happen.


> For me the major thing that I think needs to be addressed is these JIRAs
> which actually expose some API implementation level issues. Not sure if
> solving those issues will involve changes to API itself:
>
> https://issues.apache.org/jira/browse/KAFKA-1894
> https://issues.apache.org/jira/browse/KAFKA-3540


We think these can be fixed in a backwards compatible manner.

https://issues.apache.org/jira/browse/KAFKA-3539


This is for the producer, which hasn't been considered beta since 0.9.0.0.

Ismael


Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-02 Thread Ismael Juma
Thanks for the proposal Becket. Looks good overall, a few comments:

ListOffsetResponse => [TopicName [PartitionOffsets]]
>   PartitionOffsets => Partition ErrorCode Timestamp [Offset]
>   Partition => int32
>   ErrorCode => int16
>   Timestamp => int64
>   Offset => int


It should be int64 for `Offset` right?

Implementation wise, we will migrate to o.a.k.common.requests.ListOffsetRequest
> class on the broker side.


Could you clarify what you mean here? We already
use o.a.k.common.requests.ListOffsetRequest in KafkaApis.

long offset = consumer.offsetForTime(Collections.singletonMap(topicPartition,
> targetTime)).offset;


The result of `offsetForTime` is a Map, so we can't just call `offset` on
it. You probably meant something like:

long offset = consumer.offsetForTime(Collections.singletonMap(topicPartition,
targetTime)).get(topicPartition).offset;

Test searchByTimestamp with CreateTime and LogAppendTime
>

Do you mean `Test offsetForTime`?

And:

1. In KAFKA-1588, the following issue was described "When performing an
OffsetRequest, if you request the same topic and partition combination in a
single request more than once (for example, if you want to get both the
head and tail offsets for a partition in the same request), you will get a
response for both, but they will be the same offset". Will the new request
version support the use case where multiple timestamps are passed for the
same topic partition? And if we do support it at the protocol level, do we
also want to support it at the API level or do we think the additional
complexity is not worth it?

2. Is `offsetForTime` the right method name given that we are getting
multiple offsets? Maybe it should be `offsetsForTimes` or something like
that.

Ismael

On Wed, Aug 31, 2016 at 4:38 AM, Becket Qin  wrote:

> Hi Kafka devs,
>
> I created KIP-79 to allow consumer to precisely query the offsets based on
> timestamp.
>
> In short we propose to :
> 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> 2. add an offsetForTime() method in new consumer.
>
> The KIP wiki is the following:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


[jira] [Commented] (KAFKA-4118) StreamsSmokeTest.test_streams started failing since 18 August build

2016-09-02 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459966#comment-15459966
 ] 

Jason Gustafson commented on KAFKA-4118:


[~guozhang] I have a patch available for KAFKA-3807 which fixes the transient 
failures (I think). Unfortunately, this smoke test looks like it's still 
failing: 
http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2016-09-03--001.1472861344--hachikuji--KAFKA-3807--50c8cfb/.
 Would you mind taking a look? 

> StreamsSmokeTest.test_streams started failing since 18 August build
> ---
>
> Key: KAFKA-4118
> URL: https://issues.apache.org/jira/browse/KAFKA-4118
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ismael Juma
>Assignee: Guozhang Wang
> Fix For: 0.10.1.0
>
>
> Link to the first failure on 18 August: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-08-18--001.1471540190--apache--trunk--40b1dd3/report.html
> The commit corresponding to the 18 August build was 
> https://github.com/apache/kafka/commit/40b1dd3f495a59ab, which is KIP-62 (and 
> before KIP-33)
> KAFKA-3807 tracks another test that started failing at the same time and 
> there's a possibility that the PR for that JIRA fixes this one too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4118) StreamsSmokeTest.test_streams started failing since 18 August build

2016-09-02 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-4118:
--

 Summary: StreamsSmokeTest.test_streams started failing since 18 
August build
 Key: KAFKA-4118
 URL: https://issues.apache.org/jira/browse/KAFKA-4118
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Ismael Juma
Assignee: Guozhang Wang
 Fix For: 0.10.1.0


Link to the first failure on 18 August: 
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-08-18--001.1471540190--apache--trunk--40b1dd3/report.html

The commit corresponding to the 18 August build was 
https://github.com/apache/kafka/commit/40b1dd3f495a59ab, which is KIP-62 (and 
before KIP-33)

KAFKA-3807 tracks another test that started failing at the same time and 
there's a possibility that the PR for that JIRA fixes this one too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3340) Add support for rebalance and adding concurrently records with MockConsumer

2016-09-02 Thread Florian Hussonnois (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459658#comment-15459658
 ] 

Florian Hussonnois commented on KAFKA-3340:
---

I have updated a pull request for this JIRA with some additionals (I don't know 
why this PR is not tracked by JIRA).
https://github.com/apache/kafka/pull/1016 

> Add support for rebalance and adding concurrently records with MockConsumer
> ---
>
> Key: KAFKA-3340
> URL: https://issues.apache.org/jira/browse/KAFKA-3340
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Florian Hussonnois
>Priority: Minor
>
> The MockConsumer class should support adding records concurrently.
> This allow to implement more complex test scenarios in which records are 
> added concurrently with the records are polled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #859

2016-09-02 Thread Apache Jenkins Server
See 

Changes:

[junrao] KAFKA-4099; Fix the potential frequent log rolling

--
[...truncated 3801 lines...]

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage STARTED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.cluster.BrokerEndPointTest > testEndpointFromUri STARTED

kafka.cluster.BrokerEndPointTest > testEndpointFromUri PASSED

kafka.cluster.BrokerEndPointTest > testHashAndEquals STARTED

kafka.cluster.BrokerEndPointTest > testHashAndEquals PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonFutureVersion STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonFutureVersion PASSED

kafka.cluster.BrokerEndPointTest > testBrokerEndpointFromUri STARTED

kafka.cluster.BrokerEndPointTest > testBrokerEndpointFromUri PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV1 STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV1 PASSED

kafka.cluster.BrokerEndPointTest > testFromJsonV2 STARTED

kafka.cluster.BrokerEndPointTest > testFromJsonV2 PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > 
testCleanLeaderElectionDisabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testCleanLeaderElectionDisabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig STARTED

kafka.integration.MinIsrConfigTest > testDefaultKafkaConfig PASSED

kafka.integration.FetcherTest > testFetcher STARTED

kafka.integration.FetcherTest > testFetcher PASSED

kafka.integration.SslTopicMetadataTest > testIsrAfterBrokerShutDownAndJoinsBack 
STARTED

kafka.integration.SslT

Build failed in Jenkins: kafka-trunk-jdk7 #1515

2016-09-02 Thread Apache Jenkins Server
See 

Changes:

[junrao] KAFKA-4099; Fix the potential frequent log rolling

--
[...truncated 3421 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.SslTopicMetadataTest > testIsrAfterBrokerShutDownAndJoinsBack 
STARTED

kafka.

Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-02 Thread Jay Kreps
This looks great, big improvements for the list offset protocol which is
currently quite odd.

One minor thing. I think the old v0 list offsets request also gave you the
highwater mark, it kind of shoves it in as the last thing in the array of
offsets. This is used internally to implement seekToEnd() iirc. How would
that work once v0 is removed?

Related, the wiki says:
"Another related feature missing in KafkaConsumer is the access of
partitions' high watermark. Typically, users only need the high watermark
in order to get the per partition lag. This seems more suitable to be
exposed through the metrics."

The obvious usage is computing lag for sure, and I agree that is really
more a metric than anything else, but I think that is not the only usage.
Here is a use case I think is quite important that requires knowing the
highwater mark:

Say you want to implement some kind of batch process that wakes up every 5
minutes or every hour or once a day and processes all the messages and then
goes back to sleep. The naive way to do that would be to poll() until you
don't get any more records, but this is broken in two minor ways, first
maybe you didn't get records because you are rebalancing and second this
might never happen if new records are always getting written. A better
approach is for your process, when it begins, to look at the current end of
the log and process only up to that offset.

This is important for Kafka Streams or anything else that wants to have a
kind of batch-like mode.

Technically you can do this by seeking to the end, checking your position,
then starting over, as people do today. But I think we can agree that is
kind of silly.

An alternative would be to rename TimestampOffset to something like
PartitionOffsets and have it have both the timestamp and offset as well as
the beginning offset and highwatermark for the partition. The underlying
protocol would need these two.

Cheers,

-Jay

On Tue, Aug 30, 2016 at 8:38 PM, Becket Qin  wrote:

> Hi Kafka devs,
>
> I created KIP-79 to allow consumer to precisely query the offsets based on
> timestamp.
>
> In short we propose to :
> 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> 2. add an offsetForTime() method in new consumer.
>
> The KIP wiki is the following:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-02 Thread Jun Rao
Hi, Radi,

Thanks for the update. At the high level, this looks promising. A few
comments below.

1. If we can bound the requests by bytes, it seems that we don't need
queued.max.requests
any more? Could we just deprecate the config and make the queue size
unbounded?
2. How do we communicate back to the selector when some memory is freed up?
We probably need to wake up the selector. For efficiency, perhaps we only
need to wake up the selector if the bufferpool is full?
3. We talked about bounding the consumer's memory before. To fully support
that, we will need to bound the memory used by different fetch responses in
the consumer. Do you think the changes that you propose here can be
leveraged to bound the memory in the consumer as well?

Jun


On Tue, Aug 30, 2016 at 10:41 AM, radai  wrote:

> My apologies for the delay in response.
>
> I agree with the concerns about OOM reading from the actual sockets and
> blocking the network threads - messing with the request queue itself would
> not do.
>
> I propose instead a memory pool approach - the broker would have a non
> blocking memory pool. upon reading the first 4 bytes out of a socket an
> attempt would be made to acquire enough memory and if that attempt fails
> the processing thread will move on to try and make progress with other
> tasks.
>
> I think Its simpler than mute/unmute because using mute/unmute would
> require differentiating between sockets muted due to a request in progress
> (normal current operation) and sockets muted due to lack of memory. sockets
> of the 1st kind would be unmuted at the end of request processing (as it
> happens right now) but the 2nd kind would require some sort of "unmute
> watchdog" which is (i claim) more complicated than a memory pool. also a
> memory pool is a more generic solution.
>
> I've updated the KIP page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
> to reflect the new proposed implementation, and i've also put up an inital
> implementation proposal on github -
> https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool. the
> proposed code is not complete and tested yet (so probably buggy) but does
> include the main points of modification.
>
> the specific implementation of the pool on that branch also has a built in
> safety net where memory that is acquired but not released (which is a bug)
> is discovered when the garbage collector frees it and the capacity is
> reclaimed.
>
> On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao  wrote:
>
> > Radi,
> >
> > Yes, I got the benefit of bounding the request queue by bytes. My concern
> > is the following if we don't change the behavior of processor blocking on
> > queue full.
> >
> > If the broker truly doesn't have enough memory for buffering outstanding
> > requests from all connections, we have to either hit OOM or block the
> > processor. Both will be bad. I am not sure if one is clearly better than
> > the other. In this case, the solution is probably to expand the cluster
> to
> > reduce the per broker request load.
> >
> > If the broker actually has enough memory, we want to be able to configure
> > the request queue in such a way that it never blocks. You can tell people
> > to just set the request queue to be unbounded, which may scare them. If
> we
> > do want to put a bound, it seems it's easier to configure the queue size
> > based on # requests. Basically, we can tell people to set the queue size
> > based on number of connections. If the queue is based on bytes, it's not
> > clear how people should set it w/o causing the processor to block.
> >
> > Finally, Rajini has a good point. The ByteBuffer in the request object is
> > allocated as soon as we see the first 4 bytes from the socket. So, I am
> not
> > sure if just bounding the request queue itself is enough to bound the
> > memory related to requests.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Aug 8, 2016 at 4:46 PM, radai 
> wrote:
> >
> > > I agree that filling up the request queue can cause clients to time out
> > > (and presumably retry?). However, for the workloads where we expect
> this
> > > configuration to be useful the alternative is currently an OOM crash.
> > > In my opinion an initial implementation of this feature could be
> > > constrained to a simple drop-in replacement of ArrayBlockingQueue
> > > (conditional, opt-in) and further study of behavior patterns under load
> > can
> > > drive future changes to the API later when those behaviors are better
> > > understood (like back-pressure, nop filler responses to avoid client
> > > timeouts or whatever).
> > >
> > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Nice write up Radai.
> > > > I think what Jun said is a valid concern.
> > > > If I am not wrong as per the proposal, we are depending on the entire
> > > > pipeline to flow smoothly from accepting requests to h

Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-02 Thread Jun Rao
Hi, Jiangjie,

Thanks for the wiki. Looks good overall. Just one comment below.

1. In ListOffsetResponse, I am not sure if it's useful to return the
timestamp since the user can always find the timestamp by fetching the
message at the returned offset.

Jun


On Tue, Aug 30, 2016 at 8:38 PM, Becket Qin  wrote:

> Hi Kafka devs,
>
> I created KIP-79 to allow consumer to precisely query the offsets based on
> timestamp.
>
> In short we propose to :
> 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> 2. add an offsetForTime() method in new consumer.
>
> The KIP wiki is the following:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


[jira] [Commented] (KAFKA-3807) OffsetValidationTest - transient failure on test_broker_rolling_bounce

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459508#comment-15459508
 ] 

ASF GitHub Bot commented on KAFKA-3807:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1821

KAFKA-3807: Fix transient test failure caused by race on future completion



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-3807

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1821.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1821






> OffsetValidationTest - transient failure on test_broker_rolling_bounce
> --
>
> Key: KAFKA-3807
> URL: https://issues.apache.org/jira/browse/KAFKA-3807
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Reporter: Geoff Anderson
>Assignee: Jason Gustafson
>
> {code}
> test_id:
> 2016-05-28--001.kafkatest.tests.client.consumer_test.OffsetValidationTest.test_broker_rolling_bounce
> status: FAIL
> run time:   3 minutes 8.042 seconds
> Broker rolling bounce caused 2 unexpected group rebalances
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/client/consumer_test.py",
>  line 108, in test_broker_rolling_bounce
> "Broker rolling bounce caused %d unexpected group rebalances" % 
> unexpected_rebalances
> AssertionError: Broker rolling bounce caused 2 unexpected group rebalances
> {code}
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-05-28--001.1464455059--apache--trunk--7b7c4a7/report.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1821: KAFKA-3807: Fix transient test failure caused by r...

2016-09-02 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/1821

KAFKA-3807: Fix transient test failure caused by race on future completion



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-3807

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1821.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1821






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4099) Change the time based log rolling to only based on the message timestamp.

2016-09-02 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-4099:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1809
[https://github.com/apache/kafka/pull/1809]

> Change the time based log rolling to only based on the message timestamp.
> -
>
> Key: KAFKA-4099
> URL: https://issues.apache.org/jira/browse/KAFKA-4099
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> This is an issue introduced in KAFKA-3163. When partition relocation occurs, 
> the newly created replica may have messages with old timestamp and cause the 
> log segment rolling for each message. The fix is to change the log rolling 
> behavior to only based on the message timestamp when the messages are in 
> message format 0.10.0 or above. If the first message in the segment does not 
> have a timetamp, we will fall back to use the wall clock time for log rolling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1809: KAFKA-4099: Fix the potential frequent log rolling

2016-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1809


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4099) Change the time based log rolling to only based on the message timestamp.

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459428#comment-15459428
 ] 

ASF GitHub Bot commented on KAFKA-4099:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1809


> Change the time based log rolling to only based on the message timestamp.
> -
>
> Key: KAFKA-4099
> URL: https://issues.apache.org/jira/browse/KAFKA-4099
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.1.0
>
>
> This is an issue introduced in KAFKA-3163. When partition relocation occurs, 
> the newly created replica may have messages with old timestamp and cause the 
> log segment rolling for each message. The fix is to change the log rolling 
> behavior to only based on the message timestamp when the messages are in 
> message format 0.10.0 or above. If the first message in the segment does not 
> have a timetamp, we will fall back to use the wall clock time for log rolling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1820: MINOR: Add timeout to testRenegotiation.

2016-09-02 Thread SinghAsDev
GitHub user SinghAsDev opened a pull request:

https://github.com/apache/kafka/pull/1820

MINOR: Add timeout to testRenegotiation.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SinghAsDev/kafka MinorTestRenogtiation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1820


commit 10ff174518d714e485488f962135505a5db7e0d4
Author: Ashish Singh 
Date:   2016-09-02T18:39:57Z

MINOR: Add timeout to testRenegotiation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4117) Cleanup StreamPartitionAssignor behavior

2016-09-02 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4117:
-
Description: 
I went through the whole assignment logic once again and I feel the logic has 
now becomes a bit lossy, and I want to clean them up probably in another PR but 
just dump my thoughts here on the appropriate logic:

Some background:

1. Each {{KafkaStreams}} instance contains a clientId, and if not specified 
default value is applicationId-1/2/etc if there are multiple instances inside 
the same JVM. One instance contains multiple threads where the thread-clientId 
is constructed as clientId-StreamThread-1/2/etc, and the thread-clientId is 
used as the embedded consumer clientId as well as metrics tag.

2. However, since one instance can contain multiple threads, and hence multiple 
consumers, and when considering partition assignment, the streams library need 
to take the capacity into consideration based on the granularity of instance 
not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} as the 
processId and encode that in the subscription metadata bytes, and the leader 
then knows if multiple consumer members are actually belong to the same 
instance (i.e. belong to threads of that instance), so that when assigning 
partitions it can balance among instances. NOTE that in production we recommend 
one thread per instance, so consumersByClient will only have one consumer per 
client (i.e. instance).

3. In addition, historically we hard-code the partition grouper logic, where 
for each task, it is assigned only with one partition of its subscribed topic. 
For example, if we have topicA with 5 partitions and topicB with 10 partitions, 
we will create 10 tasks, with the first five tasks containing one of the 
partitions each, while the last five tasks contain only one partition from 
topicB. And therefore the TaskId class contains the groupId of the sub-topology 
and the partition, so that taskId(group, 1) gets partition1 of topicA and 
partition1 of topicB. We later expose this to users to customize so that more 
than one partitions of the topic can be assigned to the same task, so that the 
partition field in the TaskId no longer indicate anything about which 
partitions are assigned, and we add {{AssignedPartitions}} to capture which 
partitions are assigned to which tasks.

4. While doing the assignment, the leader is also responsible for creating 
these changelog / repartition topics, and the number of partitions of these 
topics are equal to the number of tasks that needs to write to these topics, 
which are wrapped in {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}} respectively. After such topics are created, 
the leader also needs to "augment" the received cluster metadata with these 
topics to 1) check for copartitioning, and 2) maintained for QueryableState's 
discovery function.

The current implementation is mixed with all these legacy logic and gets quite 
messy, and I'm thinking to make a pass over the StreamPartitionAssignor and 
cleaning up it bit. More precisely:

1. Read and parse the subscription information to construct the clientMetadata 
map, where each metadata contains the {{Set consumerMemberIds}}, 
{{ClientState state}}, and {{HostInfo hostInfo}}.

2. Access the (sub-)topology to create the corresponding changelog / 
repartition topics and construct the {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}}.

3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from 
created tasks to their assigned partitions.

4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata map) 
to assign tasks to clients, and hence we get the assigned partitions to clients.

5. For each client, use some round-robin manner (as we did now) to assign tasks 
to their hosted consumers with the {{clientMetadata.consumerMemberIds}} map.

6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} 
metadata locally on the leader.

7. Construct the assignment info, where activeTasks is also a map from 
{{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know 
which partitions are assigned to which tasks.

8. For non-leaders, when getting the assignment, also construct the Cluster 
metadata from the decoded assignment information; and also maintain the 
AssignmentInfo locally for constructing the tasks.

And some minor improvements:

1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still be 
conflicting to each other with multiple JVMs / machines, which is bad for 
metrics collection / debugging across hosts. We can modify the default clientId 
to {{applicationId-processId}} whereprocessId is the generated UUID, hence the 
default thread-clientId is {{applicationId-UUID-StreamThread-y}}.

2. The {{TaskId.partition}} field no longer indicate which partitions are

[jira] [Created] (KAFKA-4117) Cleanup StreamPartitionAssignor behavior

2016-09-02 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-4117:


 Summary: Cleanup StreamPartitionAssignor behavior
 Key: KAFKA-4117
 URL: https://issues.apache.org/jira/browse/KAFKA-4117
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Guozhang Wang


I went through the whole assignment logic once again and I feel the logic has 
now becomes a bit lossy, and I want to clean them up probably in another PR but 
just dump my thoughts here on the appropriate logic (also cc @enothereska 
@mjsax):

Some background:

1. Each KafkaStreams instance contains a clientId, and if not specified default 
value is applicationId-1/2/etc if there are multiple instances inside the same 
JVM. One instance contains multiple threads where the thread-clientId is 
constructed as clientId-StreamThread-1/2/etc, and the thread-clientId is used 
as the embedded consumer clientId as well as metrics tag.

2. However, since one instance can contain multiple threads, and hence multiple 
consumers, and when considering partition assignment, the streams library need 
to take the capacity into consideration based on the granularity of instance 
not on threads. Therefore we create a 4byte UUID.randomUUID() as the processId 
and encode that in the subscription metadata bytes, and the leader then knows 
if multiple consumer members are actually belong to the same instance (i.e. 
belong to threads of that instance), so that when assigning partitions it can 
balance among instances. NOTE that in production we recommend one thread per 
instance, so consumersByClient will only have one consumer per client (i.e. 
instance).

3. In addition, historically we hard-code the partition grouper logic, where 
for each task, it is assigned only with one partition of its subscribed topic. 
For example, if we have topicA with 5 partitions and topicB with 10 partitions, 
we will create 10 tasks, with the first five tasks containing one of the 
partitions each, while the last five tasks contain only one partition from 
topicB. And therefore the TaskId class contains the groupId of the sub-topology 
and the partition, so that taskId(group, 1) gets partition1 of topicA and 
partition1 of topicB. We later expose this to users to customize so that more 
than one partitions of the topic can be assigned to the same task, so that the 
partition field in the TaskId no longer indicate anything about which 
partitions are assigned, and we add AssignedPartitions to capture which 
partitions are assigned to which tasks.

4. While doing the assignment, the leader is also responsible for creating 
these changelog / repartition topics, and the number of partitions of these 
topics are equal to the number of tasks that needs to write to these topics, 
which are wrapped in stateChangelogTopicToTaskIds and 
internalSourceTopicToTaskIds respectively. After such topics are created, the 
leader also needs to "augment" the received cluster metadata with these topics 
to 1) check for copartitioning, and 2) maintained for QueryableState's 
discovery function.

The current implementation is mixed with all these legacy logic and gets quite 
messy, and I'm thinking to make a pass over the StreamPartitionAssignor and 
cleaning up it bit. More precisely:

1. Read and parse the subscription information to construct the clientMetadata 
map, where each metadata contains the Set consumerMemberIds, 
ClientState state, and HostInfo hostInfo.

2. Access the (sub-)topology to create the corresponding changelog / 
repartition topics and construct the stateChangelogTopicToTaskIds and 
internalSourceTopicToTaskIds.

Call streamThread.partitionGrouper.partitionGroups to get the map from created 
tasks to their assigned partitions.

3. Call TaskAssignor.assign (which now takes the whole clientMetadata map) to 
assign tasks to clients, and hence we get the assigned partitions to clients.

4. For each client, use some round-robin manner (as we did now) to assign tasks 
to their hosted consumers with the clientMetadata.consumerMemberIds map.

5. Check co-partitioning of assigned partitions, and maintain the Cluster 
metadata locally on the leader.

6. Construct the assignment info, where activeTasks is also a map from TaskId 
to list of TopicPartitions since otherwise we will not know which partitions 
are assigned to which tasks.

7. For non-leaders, when getting the assignment, also construct the Cluster 
metadata from the decoded assignment information; and also maintain the 
AssignmentInfo locally for constructing the tasks.

And some minor improvements:

1. The default thread-clientIds applicationId-x-StreamThread-y" may still be 
conflicting to each other with multiple JVMs / machines, which is bad for 
metrics collection / debugging across hosts. We can modify the default clientId 
toapplicationId-processIdwhereprocessIdisUUID, hence the default 
thread-clientId isapplicationId-UUID-StreamThr

[jira] [Commented] (KAFKA-3478) Finer Stream Flow Control

2016-09-02 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458985#comment-15458985
 ] 

Bill Bejeck commented on KAFKA-3478:


Makes sense, but I didn't realize that was the case, thanks.

> Finer Stream Flow Control
> -
>
> Key: KAFKA-3478
> URL: https://issues.apache.org/jira/browse/KAFKA-3478
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Today we have a event-time based flow control mechanism in order to 
> synchronize multiple input streams in a best effort manner:
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> However, there are some use cases where users would like to have finer 
> control of the input streams, for example, with two input streams, one of 
> them always reading from offset 0 upon (re)-starting, and the other reading 
> for log end offset.
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> We should consider how to improve this settings to allow users have finer 
> control over these frameworks.
> =
> A finer flow control could also be used to allow for populating a {{KTable}} 
> (with an "initial" state) before starting the actual processing (this feature 
> was ask for in the mailing list multiple times already). Even if it is quite 
> hard to define, *when* the initial populating phase should end, this might 
> still be useful. There would be the following possibilities:
>  1) an initial fixed time period for populating
>(it might be hard for a user to estimate the correct value)
>  2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
>  3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
>  4) a throughput threshold, ie, if the populating frequency falls below
> the threshold, the KTable is considered "finished"
>  5) maybe something else ??
> The API might look something like this
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4114) Allow for different "auto.offset.reset" strategies for different input streams

2016-09-02 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-4114:
--

Assignee: Bill Bejeck  (was: Guozhang Wang)

> Allow for different "auto.offset.reset" strategies for different input streams
> --
>
> Key: KAFKA-4114
> URL: https://issues.apache.org/jira/browse/KAFKA-4114
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> However, it would be useful to improve this settings to allow users have 
> finer control over different input stream. For example, with two input 
> streams, one of them always reading from offset 0 upon (re)-starting, and the 
> other reading for log end offset.
> This JIRA requires to extend {{KStreamBuilder}} API for methods 
> {{.stream(...)}} and {{.table(...)}} to add a new parameter that indicate the 
> initial offset to be used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1819: WIP: add context to some exceptions

2016-09-02 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/1819

WIP: add context to some exceptions

KafkaExceptions currently thrown from within StreamThread/StreamTask 
currently bubble up without any additional context. This makes it hard to 
figure out where something went wrong, i.e, which topic had the serialization 
exception etc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-3708

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1819


commit 8f37e2c48908da02ad7ac2111b70541ea918a769
Author: Damian Guy 
Date:   2016-09-02T15:29:36Z

add some context to exceptions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3478) Finer Stream Flow Control

2016-09-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458831#comment-15458831
 ] 

Matthias J. Sax commented on KAFKA-3478:


All newly creates "Streams" Jiras are assigned to him by default on creation. 
This does not mean anything. Just reassign to yourself if you want to pick onw 
up.

> Finer Stream Flow Control
> -
>
> Key: KAFKA-3478
> URL: https://issues.apache.org/jira/browse/KAFKA-3478
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Today we have a event-time based flow control mechanism in order to 
> synchronize multiple input streams in a best effort manner:
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> However, there are some use cases where users would like to have finer 
> control of the input streams, for example, with two input streams, one of 
> them always reading from offset 0 upon (re)-starting, and the other reading 
> for log end offset.
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> We should consider how to improve this settings to allow users have finer 
> control over these frameworks.
> =
> A finer flow control could also be used to allow for populating a {{KTable}} 
> (with an "initial" state) before starting the actual processing (this feature 
> was ask for in the mailing list multiple times already). Even if it is quite 
> hard to define, *when* the initial populating phase should end, this might 
> still be useful. There would be the following possibilities:
>  1) an initial fixed time period for populating
>(it might be hard for a user to estimate the correct value)
>  2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
>  3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
>  4) a throughput threshold, ie, if the populating frequency falls below
> the threshold, the KTable is considered "finished"
>  5) maybe something else ??
> The API might look something like this
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4024) First metadata update always take retry.backoff.ms milliseconds to complete

2016-09-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458699#comment-15458699
 ] 

Yuto Kawamura commented on KAFKA-4024:
--

Updated PR to fix this issue not only about the first metadata update but also 
for cases I just explained in the above comment.
After applying my patch, the first send() is no longer blocked by the first 
metadata update (*4), and the second metadata update happens immediately after 
the KafkaProducer detects broker disconnection (*5, *6).

{code}
Experimenting with retry.backoff.ms = 1
...
[2016-09-02 22:48:22,936] INFO Kafka version : 0.10.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:48:22,936] INFO Kafka commitId : 8f3462552fa4d6a6 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:48:22,939] DEBUG Initialize connection to node -2 for sending 
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:22,939] DEBUG Initiating connection to node -2 at 
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,001] DEBUG Completed connection to node -2 
(org.apache.kafka.clients.NetworkClient)

# *4 The first metadata update happenes immediately.
[2016-09-02 22:48:23,020] DEBUG Sending metadata request {topics=[test]} to 
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,043] DEBUG Updated cluster metadata version 2 to 
Cluster(nodes = [HOST-1:9092 (id: 1 rack: null), HOST-2:9092 (id: 2 rack: 
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, 
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), 
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = 
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = 
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=119
[2016-09-02 22:48:23,057] DEBUG Initiating connection to node 3 at HOST-3:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:23,060] DEBUG Completed connection to node 3 
(org.apache.kafka.clients.NetworkClient)
Produce[0]: duration=129, exception=null
Send[1]: duration=0
[2016-09-02 22:48:24,060] DEBUG Initiating connection to node 1 at HOST-1:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:24,062] DEBUG Completed connection to node 1 
(org.apache.kafka.clients.NetworkClient)
Produce[1]: duration=10, exception=null
Send[2]: duration=0
[2016-09-02 22:48:25,066] DEBUG Initiating connection to node 2 at HOST-2:9092. 
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:25,068] DEBUG Completed connection to node 2 
(org.apache.kafka.clients.NetworkClient)
Produce[2]: duration=6, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null

# *5 I stopped broker 1 at this moment
[2016-09-02 22:48:26,301] DEBUG Node 1 disconnected. 
(org.apache.kafka.clients.NetworkClient)

# *6 Metadata updated immediately after the producer detects broker 
disconnection
[2016-09-02 22:48:26,301] DEBUG Sending metadata request {topics=[test]} to 
node 2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:48:26,308] DEBUG Updated cluster metadata version 3 to 
Cluster(nodes = [HOST-3:9092 (id: 3 rack: null), HOST-1:9092 (id: 1 rack: 
null), HOST-2:9092 (id: 2 rack: null)], partitions = [Partition(topic = test, 
partition = 1, leader = 2, replicas = [1,2,3,], isr = [2,3,]), Partition(topic 
= test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,]), 
Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = 
[3,2,])]) (org.apache.kafka.clients.Metadata)
Send[4]: duration=0
Produce[4]: duration=4, exception=null
{code}


> First metadata update always take retry.backoff.ms milliseconds to complete
> ---
>
> Key: KAFKA-4024
> URL: https://issues.apache.org/jira/browse/KAFKA-4024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted 
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than 
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import

[jira] [Commented] (KAFKA-4024) First metadata update always take retry.backoff.ms milliseconds to complete

2016-09-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458639#comment-15458639
 ] 

Yuto Kawamura commented on KAFKA-4024:
--

I reconsidered this issue and think I found that this is much worse than I 
explained before.

IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays 
KafkaProducer to update outdated metadata.
That is, when we set {{retry.backoff.ms}} to 1 second for example, and a 
partition leadership failover happens, the producer will take 1 seconds to fire 
metadata request in the worst case, even though it could detect broker 
disconnection or outdated partition leadership information.

Here's the result of my experiment. I modified 
{{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of 
NetworkClient and Metadata.

clients/src/main/java/org/apache/kafka/clients/Metadata.java:
{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"HOST-1:9092,HOST-2:9092,HOST-3:9092");
props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
props.setProperty(ProducerConfig.RETRIES_CONFIG, 
String.valueOf(Integer.MAX_VALUE));
String retryBackoffMs = System.getProperty("retry.backoff.ms");
System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

Producer producer =
new KafkaProducer<>(props, new StringSerializer(), new 
StringSerializer());

try {
int i = 0;
while (true) {
final int produceSeq = i++;
final long t0 = System.nanoTime();
producer.send(new ProducerRecord<>("test", produceSeq % 3, 
"key", "value"),
  new Callback() {
  @Override
  public void onCompletion(RecordMetadata 
metadata, Exception exception) {
  long produceDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
  System.err.printf("Produce[%d]: 
duration=%d, exception=%s\n", produceSeq, produceDuration, exception);
  }
  });
long sendDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
System.err.printf("Send[%d]: duration=%d\n", produceSeq, 
sendDuration);
Thread.sleep(1000);
}
} finally {
producer.close();
}
}
}
{code}

log4j.properties:
{code}
log4j.rootLogger=INFO, stdout

log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.Metadata=false
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.producer.internals.Sender=DEBUG, 
stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
{code}

Topic "test" has 3 replicas and 3 partitions.
Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1 
manually at (*2). Here's the log:

{code}
./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties 
-Dretry.backoff.ms=1 KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1
...
[2016-09-02 22:36:29,839] INFO Kafka version : 0.10.1.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6 
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending 
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at 
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,883] DEBUG Completed connection to node -2 
(org.apache.kafka.clients.NetworkClient)

# *1 The first metadata request
[2016-09-02 22:36:39,902] DEB

[jira] [Updated] (KAFKA-4024) First metadata update always take retry.backoff.ms milliseconds to complete

2016-09-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-4024:
-
Description: 
Recently I updated our KafkaProducer configuration, specifically we adjusted 
{{retry.backoff.ms}} from default(100ms) to 1000ms.
After that we observed that the first {{send()}} start taking longer than 
before, investigated then found following facts.

Environment:
- Kafka broker 0.9.0.1
- Kafka producer 0.9.0.1

Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
branch as well.

h2. TL;DR
The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
milliseconds, due to unintentionally applied backoff on first metadata update.


h2. Proof
I wrote following test code and placed under the clients/main/java/

{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3");
String retryBackoffMs = System.getProperty("retry.backoff.ms");
System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

Producer producer =
new KafkaProducer<>(props, new ByteArraySerializer(), new 
ByteArraySerializer());

long t0 = System.nanoTime();
try {
producer.partitionsFor("test");
long duration = System.nanoTime() - t0;
System.err.println("Duration = " + 
TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
} finally {
producer.close();
}
}
}
{code}

Here's experiment log:
{code}
# Start zookeeper & kafka broker
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

# Create test topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 100
Duration = 175 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1000
Duration = 1066 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1
Duration = 10070 ms
{code}

As you can see, duration of {{partitionsFor()}} increases linearly in 
proportion to the value of {{retry.backoff.ms}}.

Here I describe the scenario that leads this behavior:
1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and the 
current timestamp: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
2. On the first {{send()}}, KafkaProducer requests metadata update due to 
missing partition info: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because 
{{metadata.timeToNextUpdate}} returns a value lager than zero: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata 
expiration or time till backing off expiration but practially needUpdate is 
always true at the first time so here the timeToAllowUpdate is always adopted, 
which never be zero until {{retry.backoff.ms}} elapsed since the first 
{{metadata.update()}}: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116


This is because of kafka client tries to keep interval configured by 
{{retry.backoff.ms}} between each metadata update so it's basically works fine 
from the second update but for the first time, since it could never have the 
actual metadata(which is obtained by MetadaUpdate request), this backing off 
isn't making sense and in fact it's harming our application by blocking the 
first {{send()}} insanely long.

  was:
gtgtRecently I updated our KafkaProducer configuration, specifically we 
adjusted {{retry.backoff.ms}} from default(100ms) to 1000ms.
After that we observed that the first {{send()}} start taking longer than 
before, investigated then found following facts.

Environment:
- Kafka broker 0.9.0.1

[jira] [Updated] (KAFKA-4024) First metadata update always take retry.backoff.ms milliseconds to complete

2016-09-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-4024:
-
Description: 
gtgtRecently I updated our KafkaProducer configuration, specifically we 
adjusted {{retry.backoff.ms}} from default(100ms) to 1000ms.
After that we observed that the first {{send()}} start taking longer than 
before, investigated then found following facts.

Environment:
- Kafka broker 0.9.0.1
- Kafka producer 0.9.0.1

Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
branch as well.

h2. TL;DR
The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
milliseconds, due to unintentionally applied backoff on first metadata update.


h2. Proof
I wrote following test code and placed under the clients/main/java/

{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public final class KafkaProducerMetadataUpdateDurationTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3");
String retryBackoffMs = System.getProperty("retry.backoff.ms");
System.err.println("Experimenting with retry.backoff.ms = " + 
retryBackoffMs);
props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
retryBackoffMs);

Producer producer =
new KafkaProducer<>(props, new ByteArraySerializer(), new 
ByteArraySerializer());

long t0 = System.nanoTime();
try {
producer.partitionsFor("test");
long duration = System.nanoTime() - t0;
System.err.println("Duration = " + 
TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
} finally {
producer.close();
}
}
}
{code}

Here's experiment log:
{code}
# Start zookeeper & kafka broker
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

# Create test topic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 100
Duration = 175 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1000
Duration = 1066 ms

$ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1 
KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 1
Duration = 10070 ms
{code}

As you can see, duration of {{partitionsFor()}} increases linearly in 
proportion to the value of {{retry.backoff.ms}}.

Here I describe the scenario that leads this behavior:
1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and the 
current timestamp: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
2. On the first {{send()}}, KafkaProducer requests metadata update due to 
missing partition info: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because 
{{metadata.timeToNextUpdate}} returns a value lager than zero: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata 
expiration or time till backing off expiration but practially needUpdate is 
always true at the first time so here the timeToAllowUpdate is always adopted, 
which never be zero until {{retry.backoff.ms}} elapsed since the first 
{{metadata.update()}}: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116


This is because of kafka client tries to keep interval configured by 
{{retry.backoff.ms}} between each metadata update so it's basically works fine 
from the second update but for the first time, since it could never have the 
actual metadata(which is obtained by MetadaUpdate request), this backing off 
isn't making sense and in fact it's harming our application by blocking the 
first {{send()}} insanely long.

  was:
Recently I updated our KafkaProducer configuration, specifically we adjusted 
{{retry.backoff.ms}} from default(100ms) to 1000ms.
After that we observed that the first {{send()}} start taking longer than 
before, investigated then found following facts.

Environment:
- Kafka broker 0.9.0.1

[jira] [Commented] (KAFKA-3587) LogCleaner fails due to incorrect offset map computation on a replica

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458428#comment-15458428
 ] 

ASF GitHub Bot commented on KAFKA-3587:
---

GitHub user id opened a pull request:

https://github.com/apache/kafka/pull/1818

Backport KAFKA-3587



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/klarna/kafka backport-KAFKA-3587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1818


commit f2e99a874b30bb3b26f15348a9ac5f55dd7d700e
Author: Ivan Dyachkov 
Date:   2016-09-02T09:43:15Z

Applying https://github.com/apache/kafka/pull/1332

KAFKA-3587: LogCleaner fails due to incorrect offset map computation

commit c7453a0a4a782a5c2194fb6d9d18f76034d2490e
Author: Ivan Dyachkov 
Date:   2016-09-02T11:47:33Z

fix test




> LogCleaner fails due to incorrect offset map computation on a replica
> -
>
> Key: KAFKA-3587
> URL: https://issues.apache.org/jira/browse/KAFKA-3587
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Linux
>Reporter: Kiran Pillarisetty
>Assignee: Edoardo Comar
> Fix For: 0.10.0.0
>
> Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=30
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=30
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> .log  --print-data-log
> Dumping .log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [k

[GitHub] kafka pull request #1818: Backport KAFKA-3587

2016-09-02 Thread id
GitHub user id opened a pull request:

https://github.com/apache/kafka/pull/1818

Backport KAFKA-3587



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/klarna/kafka backport-KAFKA-3587

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1818


commit f2e99a874b30bb3b26f15348a9ac5f55dd7d700e
Author: Ivan Dyachkov 
Date:   2016-09-02T09:43:15Z

Applying https://github.com/apache/kafka/pull/1332

KAFKA-3587: LogCleaner fails due to incorrect offset map computation

commit c7453a0a4a782a5c2194fb6d9d18f76034d2490e
Author: Ivan Dyachkov 
Date:   2016-09-02T11:47:33Z

fix test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3478) Finer Stream Flow Control

2016-09-02 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458306#comment-15458306
 ] 

Bill Bejeck commented on KAFKA-3478:


[~mjsax] thanks for the heads up, but it looks like they are assigned to 
[~guozhang]

> Finer Stream Flow Control
> -
>
> Key: KAFKA-3478
> URL: https://issues.apache.org/jira/browse/KAFKA-3478
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Today we have a event-time based flow control mechanism in order to 
> synchronize multiple input streams in a best effort manner:
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> However, there are some use cases where users would like to have finer 
> control of the input streams, for example, with two input streams, one of 
> them always reading from offset 0 upon (re)-starting, and the other reading 
> for log end offset.
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> We should consider how to improve this settings to allow users have finer 
> control over these frameworks.
> =
> A finer flow control could also be used to allow for populating a {{KTable}} 
> (with an "initial" state) before starting the actual processing (this feature 
> was ask for in the mailing list multiple times already). Even if it is quite 
> hard to define, *when* the initial populating phase should end, this might 
> still be useful. There would be the following possibilities:
>  1) an initial fixed time period for populating
>(it might be hard for a user to estimate the correct value)
>  2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
>  3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
>  4) a throughput threshold, ie, if the populating frequency falls below
> the threshold, the KTable is considered "finished"
>  5) maybe something else ??
> The API might look something like this
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2016-09-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458144#comment-15458144
 ] 

Matthias J. Sax commented on KAFKA-4113:


Thanks for pointing out! I read it once. Good post! :)

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #858

2016-09-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4104: Queryable state metadata is sometimes invalid

--
[...truncated 7235 lines...]
kafka.log.LogTest > testReadOutOfRange STARTED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testAppendWithOutOfOrderOffsetsThrowsException STARTED

kafka.log.LogTest > testAppendWithOutOfOrderOffsetsThrowsException PASSED

kafka.log.LogTest > 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete STARTED

kafka.log.LogTest > 
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete PASSED

kafka.log.LogTest > testReadAtLogGap STARTED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testTimeBasedLogRoll STARTED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog STARTED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testMessageSetSizeCheck STARTED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testIndexResizingAtTruncation STARTED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testCompactedTopicConstraints STARTED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset STARTED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets STARTED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testDeleteOldSegmentsMethod STARTED

kafka.log.LogTest > testDeleteOldSegmentsMethod PASSED

kafka.log.LogTest > shouldDeleteSizeBasedSegments STARTED

kafka.log.LogTest > shouldDeleteSizeBasedSegments PASSED

kafka.log.LogTest > testParseTopicPartitionNameForNull STARTED

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets STARTED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testCorruptIndexRebuild STARTED

kafka.log.LogTest > testCorruptIndexRebuild PASSED

kafka.log.LogTest > shouldDeleteTimeBasedSegmentsReadyToBeDeleted STARTED

kafka.log.LogTest > shouldDeleteTimeBasedSegmentsReadyToBeDeleted PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved STARTED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testCompressedMessages STARTED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload STARTED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog STARTED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset STARTED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testReopenThenTruncate STARTED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName STARTED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles STARTED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages STARTED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages PASSED

kafka.log.LogTest > testSizeBasedLogRoll STARTED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
STARTED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter STARTED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testParseTopicPartitionName STARTED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testTruncateTo STARTED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile STARTED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets STARTED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets PASSED

kafka.log.LogConfigTest > testFromPropsEmpty STARTED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testKafkaConfigToProps STARTED

kafka.log.LogConfigTest > testKafkaConfigToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid STARTED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.CleanerTest > testBuildOffsetMap STARTED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testBuildOffsetMapFakeLarge STARTED

kafka.l

Build failed in Jenkins: kafka-trunk-jdk7 #1514

2016-09-02 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4104: Queryable state metadata is sometimes invalid

--
[...truncated 4478 lines...]

kafka.network.SocketServerTest > tooBigRequestIsRejected PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue STARTED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask STARTED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask PASSED

kafka.utils.timer.TimerTest > testTaskExpiration STARTED

kafka.utils.timer.TimerTest > testTaskExpiration PASSED

kafka.utils.timer.TimerTaskListTest > testAll STARTED

kafka.utils.timer.TimerTaskListTest > testAll PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.consumer.TopicFilterTest > testWhitelists STARTED

kafka.consumer.TopicFilterTest > testWhitelists PASSED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson STARTED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson PASSED

kafka.consumer.TopicFilterTest > testBlacklists STARTED

kafka.consumer.TopicFilterTest > testBlacklists PASSED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor PASSED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.consumer.ZookeeperConsumerConnectorTest >

[jira] [Updated] (KAFKA-3703) Selector.close() doesn't complete outgoing writes

2016-09-02 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3703:
--
Status: Patch Available  (was: Open)

> Selector.close() doesn't complete outgoing writes
> -
>
> Key: KAFKA-3703
> URL: https://issues.apache.org/jira/browse/KAFKA-3703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Outgoing writes may be discarded when a connection is closed. For instance, 
> when running a producer with acks=0, a producer that writes data and closes 
> the producer would expect to see all writes to complete if there are no 
> errors. But close() simply closes the channel and socket which could result 
> in outgoing data being discarded.
> This is also an issue in consumers which use commitAsync to commit offsets. 
> Closing the consumer may result in commits being discarded because writes 
> have not completed before close().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3703) Selector.close() doesn't complete outgoing writes

2016-09-02 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3703:
--
Description: 
Outgoing writes may be discarded when a connection is closed. For instance, 
when running a producer with acks=0, a producer that writes data and closes the 
producer would expect to see all writes to complete if there are no errors. But 
close() simply closes the channel and socket which could result in outgoing 
data being discarded.

This is also an issue in consumers which use commitAsync to commit offsets. 
Closing the consumer may result in commits being discarded because writes have 
not completed before close().

  was:Outgoing writes may be discarded when a connection is closed. For 
instance, when running a producer with acks=0, a producer that writes data and 
closes the producer would expect to see all writes to complete if there are no 
errors. But close() simply closes the channel and socket which could result in 
outgoing data being discarded.


> Selector.close() doesn't complete outgoing writes
> -
>
> Key: KAFKA-3703
> URL: https://issues.apache.org/jira/browse/KAFKA-3703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Outgoing writes may be discarded when a connection is closed. For instance, 
> when running a producer with acks=0, a producer that writes data and closes 
> the producer would expect to see all writes to complete if there are no 
> errors. But close() simply closes the channel and socket which could result 
> in outgoing data being discarded.
> This is also an issue in consumers which use commitAsync to commit offsets. 
> Closing the consumer may result in commits being discarded because writes 
> have not completed before close().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3703) Selector.close() doesn't complete outgoing writes

2016-09-02 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3703:
--
Affects Version/s: 0.10.0.1
  Component/s: clients
  Summary: Selector.close() doesn't complete outgoing writes  (was: 
PlaintextTransportLayer.close() doesn't complete outgoing writes)

> Selector.close() doesn't complete outgoing writes
> -
>
> Key: KAFKA-3703
> URL: https://issues.apache.org/jira/browse/KAFKA-3703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Outgoing writes may be discarded when a connection is closed. For instance, 
> when running a producer with acks=0, a producer that writes data and closes 
> the producer would expect to see all writes to complete if there are no 
> errors. But close() simply closes the channel and socket which could result 
> in outgoing data being discarded.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1817: KAFKA-3703: Flush outgoing writes before closing c...

2016-09-02 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/1817

KAFKA-3703: Flush outgoing writes before closing client selector

Close client connections only after outgoing writes complete or timeout.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1817


commit 50f009bebe0beaf55cb5e00f9db8fcb626f1399a
Author: Rajini Sivaram 
Date:   2016-09-02T07:55:49Z

KAFKA-3703: Flush outgoing writes before closing client selector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3703) PlaintextTransportLayer.close() doesn't complete outgoing writes

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457896#comment-15457896
 ] 

ASF GitHub Bot commented on KAFKA-3703:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/1817

KAFKA-3703: Flush outgoing writes before closing client selector

Close client connections only after outgoing writes complete or timeout.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-3703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1817


commit 50f009bebe0beaf55cb5e00f9db8fcb626f1399a
Author: Rajini Sivaram 
Date:   2016-09-02T07:55:49Z

KAFKA-3703: Flush outgoing writes before closing client selector




> PlaintextTransportLayer.close() doesn't complete outgoing writes
> 
>
> Key: KAFKA-3703
> URL: https://issues.apache.org/jira/browse/KAFKA-3703
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>
> Outgoing writes may be discarded when a connection is closed. For instance, 
> when running a producer with acks=0, a producer that writes data and closes 
> the producer would expect to see all writes to complete if there are no 
> errors. But close() simply closes the channel and socket which could result 
> in outgoing data being discarded.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4116) Specifying 0.0.0.0 in "listeners" doesn't work

2016-09-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457774#comment-15457774
 ] 

Yuto Kawamura commented on KAFKA-4116:
--

[~gwenshap] PTAL.

> Specifying 0.0.0.0 in "listeners" doesn't work
> --
>
> Key: KAFKA-4116
> URL: https://issues.apache.org/jira/browse/KAFKA-4116
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.0.2
>
>
> The document of {{listeners}} says:
> "Specify hostname as 0.0.0.0 to bind to all interfaces."
> However when I give config such as below, a started kafka broker can't join 
> the cluster due to invalid address advertised on zk.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> # advertised.listeners=
> {code}
> This is because of:
> - {{advertised.listeners}} which is used as an address to publish on zk 
> defaults to {{listeners}}
> - KafkaHealthcheck#register isn't considering the host "0.0.0.0" as a special 
> case : 
> https://github.com/apache/kafka/blob/8f3462552fa4d6a6d70a837c2ef7439bba512657/core/src/main/scala/kafka/server/KafkaHealthcheck.scala#L60-L61
> h3. Proof
> Test environment:
> - kafka-broker version 0.10.1.0-SNAPSHOT(build from trunk)
> - Brokers HOST-A, HOST-B, HOST-C
> - Controller: HOST-A
> - topic-A has 3 replicas, 3 partitions
> Update HOST-B's server.properties with updating listeners to below and 
> restart the broker.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> {code}
> Then HOST-B registeres it's broker info to ZK path {{/brokers/ids/2}}, but 
> "0.0.0.0" is used as it's host:
> {code}
> [zk: ZKHOST1:2181,ZKHOST2:2181,ZKHOST3:2181/kafka-test(CONNECTED) 8] get 
> /brokers/ids/2
> {"jmx_port":12345,"timestamp":"1472796372181","endpoints":["PLAINTEXT://0.0.0.0:9092"],"host":"0.0.0.0","version":3,"port":9092}
> {code}
> Controller tries to send an request to the above address but of course it 
> will never reach to the HOST-B.
> controller.log:
> {code}
> [2016-09-02 15:06:12,206] INFO [Controller-1-to-broker-2-send-thread], 
> Controller 1 connected to 0.0.0.0:9092 (id: 2 rack: null) for sending state 
> change requests (kafka.controller.RequestSendThread)
> {code}
> I'm guessing maybe controller sending a request to itself(kafka broker 
> working on the same instance), as calling connect("0.0.0.0") results to 
> connect to localhost, which sounds scary but havn't digged into.
> So the ISR won't recovered even through a broker starts up.
> {code}
> ./kafka-topics.sh ... --describe --topic topic-A
> Topic:topic-A   PartitionCount:3ReplicationFactor:3 
> Configs:retention.ms=8640,min.insync.replicas=2
> Topic: topic-A  Partition: 0Leader: 3   Replicas: 3,2,1 Isr: 
> 1,3
> Topic: topic-A  Partition: 1Leader: 1   Replicas: 1,3,2 Isr: 
> 1,3
> Topic: topic-A  Partition: 2Leader: 1   Replicas: 2,1,3 Isr: 
> 1,3
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4116) Specifying 0.0.0.0 in "listeners" doesn't work

2016-09-02 Thread Yuto Kawamura (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-4116:
-
Status: Patch Available  (was: Open)

> Specifying 0.0.0.0 in "listeners" doesn't work
> --
>
> Key: KAFKA-4116
> URL: https://issues.apache.org/jira/browse/KAFKA-4116
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1, 0.9.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.0.2
>
>
> The document of {{listeners}} says:
> "Specify hostname as 0.0.0.0 to bind to all interfaces."
> However when I give config such as below, a started kafka broker can't join 
> the cluster due to invalid address advertised on zk.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> # advertised.listeners=
> {code}
> This is because of:
> - {{advertised.listeners}} which is used as an address to publish on zk 
> defaults to {{listeners}}
> - KafkaHealthcheck#register isn't considering the host "0.0.0.0" as a special 
> case : 
> https://github.com/apache/kafka/blob/8f3462552fa4d6a6d70a837c2ef7439bba512657/core/src/main/scala/kafka/server/KafkaHealthcheck.scala#L60-L61
> h3. Proof
> Test environment:
> - kafka-broker version 0.10.1.0-SNAPSHOT(build from trunk)
> - Brokers HOST-A, HOST-B, HOST-C
> - Controller: HOST-A
> - topic-A has 3 replicas, 3 partitions
> Update HOST-B's server.properties with updating listeners to below and 
> restart the broker.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> {code}
> Then HOST-B registeres it's broker info to ZK path {{/brokers/ids/2}}, but 
> "0.0.0.0" is used as it's host:
> {code}
> [zk: ZKHOST1:2181,ZKHOST2:2181,ZKHOST3:2181/kafka-test(CONNECTED) 8] get 
> /brokers/ids/2
> {"jmx_port":12345,"timestamp":"1472796372181","endpoints":["PLAINTEXT://0.0.0.0:9092"],"host":"0.0.0.0","version":3,"port":9092}
> {code}
> Controller tries to send an request to the above address but of course it 
> will never reach to the HOST-B.
> controller.log:
> {code}
> [2016-09-02 15:06:12,206] INFO [Controller-1-to-broker-2-send-thread], 
> Controller 1 connected to 0.0.0.0:9092 (id: 2 rack: null) for sending state 
> change requests (kafka.controller.RequestSendThread)
> {code}
> I'm guessing maybe controller sending a request to itself(kafka broker 
> working on the same instance), as calling connect("0.0.0.0") results to 
> connect to localhost, which sounds scary but havn't digged into.
> So the ISR won't recovered even through a broker starts up.
> {code}
> ./kafka-topics.sh ... --describe --topic topic-A
> Topic:topic-A   PartitionCount:3ReplicationFactor:3 
> Configs:retention.ms=8640,min.insync.replicas=2
> Topic: topic-A  Partition: 0Leader: 3   Replicas: 3,2,1 Isr: 
> 1,3
> Topic: topic-A  Partition: 1Leader: 1   Replicas: 1,3,2 Isr: 
> 1,3
> Topic: topic-A  Partition: 2Leader: 1   Replicas: 2,1,3 Isr: 
> 1,3
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4116) Specifying 0.0.0.0 in "listeners" doesn't work

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457768#comment-15457768
 ] 

ASF GitHub Bot commented on KAFKA-4116:
---

GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1816

KAFKA-4116: Handle 0.0.0.0 as a special case when using advertised.listeners

Issue: https://issues.apache.org/jira/browse/KAFKA-4116

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka KAFKA-4116-listeners

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1816


commit 0eec0393b41e4e75d942b3431e328d5acc18ca7f
Author: Yuto Kawamura 
Date:   2016-09-02T07:11:04Z

KAFKA-4116: Handle 0.0.0.0 as a special case when using advertised.listeners




> Specifying 0.0.0.0 in "listeners" doesn't work
> --
>
> Key: KAFKA-4116
> URL: https://issues.apache.org/jira/browse/KAFKA-4116
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.0.2
>
>
> The document of {{listeners}} says:
> "Specify hostname as 0.0.0.0 to bind to all interfaces."
> However when I give config such as below, a started kafka broker can't join 
> the cluster due to invalid address advertised on zk.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> # advertised.listeners=
> {code}
> This is because of:
> - {{advertised.listeners}} which is used as an address to publish on zk 
> defaults to {{listeners}}
> - KafkaHealthcheck#register isn't considering the host "0.0.0.0" as a special 
> case : 
> https://github.com/apache/kafka/blob/8f3462552fa4d6a6d70a837c2ef7439bba512657/core/src/main/scala/kafka/server/KafkaHealthcheck.scala#L60-L61
> h3. Proof
> Test environment:
> - kafka-broker version 0.10.1.0-SNAPSHOT(build from trunk)
> - Brokers HOST-A, HOST-B, HOST-C
> - Controller: HOST-A
> - topic-A has 3 replicas, 3 partitions
> Update HOST-B's server.properties with updating listeners to below and 
> restart the broker.
> {code}
> listeners=PLAINTEXT://0.0.0.0:9092
> {code}
> Then HOST-B registeres it's broker info to ZK path {{/brokers/ids/2}}, but 
> "0.0.0.0" is used as it's host:
> {code}
> [zk: ZKHOST1:2181,ZKHOST2:2181,ZKHOST3:2181/kafka-test(CONNECTED) 8] get 
> /brokers/ids/2
> {"jmx_port":12345,"timestamp":"1472796372181","endpoints":["PLAINTEXT://0.0.0.0:9092"],"host":"0.0.0.0","version":3,"port":9092}
> {code}
> Controller tries to send an request to the above address but of course it 
> will never reach to the HOST-B.
> controller.log:
> {code}
> [2016-09-02 15:06:12,206] INFO [Controller-1-to-broker-2-send-thread], 
> Controller 1 connected to 0.0.0.0:9092 (id: 2 rack: null) for sending state 
> change requests (kafka.controller.RequestSendThread)
> {code}
> I'm guessing maybe controller sending a request to itself(kafka broker 
> working on the same instance), as calling connect("0.0.0.0") results to 
> connect to localhost, which sounds scary but havn't digged into.
> So the ISR won't recovered even through a broker starts up.
> {code}
> ./kafka-topics.sh ... --describe --topic topic-A
> Topic:topic-A   PartitionCount:3ReplicationFactor:3 
> Configs:retention.ms=8640,min.insync.replicas=2
> Topic: topic-A  Partition: 0Leader: 3   Replicas: 3,2,1 Isr: 
> 1,3
> Topic: topic-A  Partition: 1Leader: 1   Replicas: 1,3,2 Isr: 
> 1,3
> Topic: topic-A  Partition: 2Leader: 1   Replicas: 2,1,3 Isr: 
> 1,3
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1816: KAFKA-4116: Handle 0.0.0.0 as a special case when ...

2016-09-02 Thread kawamuray
GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1816

KAFKA-4116: Handle 0.0.0.0 as a special case when using advertised.listeners

Issue: https://issues.apache.org/jira/browse/KAFKA-4116

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka KAFKA-4116-listeners

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1816


commit 0eec0393b41e4e75d942b3431e328d5acc18ca7f
Author: Yuto Kawamura 
Date:   2016-09-02T07:11:04Z

KAFKA-4116: Handle 0.0.0.0 as a special case when using advertised.listeners




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---