Build failed in Jenkins: kafka-trunk-jdk7 #1867

2017-01-23 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4671: Fix Streams window retention policy

[jason] MINOR: Add offset information to consumer_test error messages

--
[...truncated 19456 lines...]
org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldEvictEldestEntry PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldBeReentrantAndNotBreakLRU STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldBeReentrantAndNotBreakLRU PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldDeleteAndUpdateSize STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldDeleteAndUpdateSize PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutAll STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutAll PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutGet STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutGet PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey
 STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldNotThrowIllegalArgumentAfterEvictingDirtyRecordAndThenPuttingNewRecordWithSameKey
 PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldGetIteratorOverAllKeys STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldGetIteratorOverAllKeys PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldRemoveDeletedValuesOnFlush STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldRemoveDeletedValuesOnFlush PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > testMetrics STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > testMetrics PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldGetRangeIteratorOverKeys STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldGetRangeIteratorOverKeys PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldFlushDirtEntriesOnEviction STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldFlushDirtEntriesOnEviction PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutIfAbsent 
STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldPutIfAbsent 
PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry 
STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldThrowIllegalStateExceptionWhenTryingToOverwriteDirtyEntryWithCleanEntry 
PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldReturnNullIfKeyIsNull STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > 
shouldReturnNullIfKeyIsNull PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldOverwriteAll 
STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldOverwriteAll 
PASSED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldKeepTrackOfSize 
STARTED

org.apache.kafka.streams.state.internals.NamedCacheTest > shouldKeepTrackOfSize 
PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteAllKeyValueToInnerStoreOnPutAll STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteAllKeyValueToInnerStoreOnPutAll PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldLogChangeOnPut STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldLogChangeOnPut PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldPutNullOnDelete STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldPutNullOnDelete PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteToInnerOnPutIfAbsentNoPreviousValue STARTED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteToInnerOnPutIfAbsentNoPreviousValue PASSED

org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStoreTest > 
shouldWriteKeyValueBytesToInnerStoreOnPut STARTED


Re: [DISCUSS] KIP-111 : Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-01-23 Thread Mayuresh Gharat
Hi Rajini,

Thanks a lot for the review. Please see the comments inline :

It feels like the goal is to expose custom Principal as an
opaque object between PrincipalBuilder and Authorizer so that Kafka doesn't
really need to know anything about additional stuff added for
customization. But kafka-acls.sh is expecting a key-value map from which
Principal is constructed. This is a breaking change to the PrincipalBuilder
interface - and I am not sure what it achieves.
-> kafka-acls is a commandline tool where in currently we just specify
the "names" of the principal that are allowed or denied.
The Principal generated by PrincipalBuilder is still opaque and Kafka as
such does not need to know the details.
The key-value map that is been passed in, will be used specifically by the
user PrincipalBuilder to create the Principal. The main motivation of the
KIP is that, the Principal built by the PrincipalBuilder can have other
fields apart from the "name", which are ignored currently. Allowing a
key-value pair to be passed in will enable the PrincipalBuilder to create
such type of Principal.

1. A custom Principal is (a) created during authentication using custom
PrincipalBuilder (b) checked during authorization using Principal.equals()
and (c) stored in Zookeeper using Principal.toString(). Is that correct?
-> The authorization will be done as per the user supplied Authorizer.
As not everyone might be using zookeeper for storing ACLs, its storage is
again Authorizer  implementation dependent.

2. Is the reason for the new parameters in kafka-acls.sh and the breaking
change in PrincipalBuilder interface to enable users to specify a Principal
using properties rather than create the String in 1c) themselves?
-> Please see the explanation above.

3. Since the purpose of the new PrincipalBuilder method
buildPrincipal(Map principalConfigs) is to create a new Principal from command line
parameters, perhaps Properties or Map would be more
appropriate?
-> Yes we can, but I actually prefer to keep it similar to
configure(Map configs) API.


Hi Ismael,

Thanks a lot for the review. Please see the comments inline.

1. PrincipalBuilder implements Configurable and gets a map of properties
via the `configure` method. Do we really need a new `buildPrincipal` method
given that?
--> The configure() API will actually be used to configure the
PrincipalBuilder in the same way as the Authorizer. The buildPrincipal()
API will be used by the PrincipalBuilder to build individual principals.
Each of these principals can be of different custom types like
GroupPrincipals, ServicePrincipals and so on, based on the Map
principalConfigs provided to the buildPrincipal() API.

2. Jun suggested in the JIRA that it may make sense to pass the
`channelPrincipal` as a field in `Session` instead of `KafkaPrincipal`. It
would be good to understand why this was rejected.
-> Now I understand what Jun meant by "Perhaps, we could extend the
Session object with channelPrincipal instead.". Actually thinking more on
this, there is a PrincipalType in KafkaPrincipal, that was inserted for a
specific purpose when it was created for the first time, I think. I thought
that we should preserve it, if its useful for future.

Thanks,

Mayuresh





On Mon, Jan 23, 2017 at 8:56 AM, Ismael Juma  wrote:

> Hi Mayuresh,
>
> Thanks for updating the KIP. A couple of questions:
>
> 1. PrincipalBuilder implements Configurable and gets a map of properties
> via the `configure` method. Do we really need a new `buildPrincipal` method
> given that?
>
> 2. Jun suggested in the JIRA that it may make sense to pass the
> `channelPrincipal` as a field in `Session` instead of `KafkaPrincipal`. It
> would be good to understand why this was rejected.
>
> Ismael
>
> On Thu, Jan 12, 2017 at 7:07 PM, Ismael Juma  wrote:
>
> > Hi Mayuresh,
> >
> > Thanks for the KIP. A quick comment before I do a more detailed analysis,
> > the KIP says:
> >
> > `This KIP is a pure addition to existing functionality and does not
> > include any backward incompatible changes.`
> >
> > However, the KIP is proposing the addition of a method to the
> > PrincipalBuilder pluggable interface, which is not a compatible change.
> > Existing implementations would no longer compile, for example. It would
> be
> > good to make this clear in the KIP.
> >
> > Ismael
> >
> > On Thu, Jan 12, 2017 at 5:44 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> >> Hi all.
> >>
> >> We created KIP-111 to propose that Kafka should preserve the Principal
> >> generated by the PrincipalBuilder while processing the request received
> on
> >> socket channel, on the broker.
> >>
> >> Please find the KIP wiki in the link
> >> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=67638388
> >> .
> >> We would love to hear your comments and suggestions.
> >>
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >

[GitHub] kafka pull request #2428: HOTIFX: streams system test do not start up correc...

2017-01-23 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2428

HOTIFX: streams system test do not start up correctly



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka hotfixSystemTests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2428.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2428






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-01-23 Thread Jason Gustafson
Good point. The consumer does use a separate connection to the coordinator,
so perhaps the connection itself could be tagged for normal heap allocation?

-Jason

On Mon, Jan 23, 2017 at 10:26 AM, Onur Karaman  wrote:

> I only did a quick scan but I wanted to point out what I think is an
> incorrect assumption in the KIP's caveats:
> "
> There is a risk using the MemoryPool that, after we fill up the memory with
> fetch data, we can starve the coordinator's connection
> ...
> To alleviate this issue, only messages larger than 1Kb will be allocated in
> the MemoryPool. Smaller messages will be allocated directly on the Heap
> like before. This allows group/heartbeat messages to avoid being delayed if
> the MemoryPool fills up.
> "
>
> So it sounds like there's an incorrect assumption that responses from the
> coordinator will always be small (< 1Kb as mentioned in the caveat). There
> are now a handful of request types between clients and the coordinator:
> {JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
> ListGroups, DescribeGroups}. While true (at least today) for
> HeartbeatResponse and a few others, I don't think we can assume
> JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
> OffsetFetchResponse will be small, as they are effectively bounded by the
> max message size allowed by the broker for the __consumer_offsets topic
> which by default is 1MB.
>
> On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison 
> wrote:
>
> > I've updated the KIP to address all the comments raised here and from
> > the "DISCUSS" thread.
> > See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >
> > Now, I'd like to restart the vote.
> >
> > On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
> >  wrote:
> > > Hi Mickael,
> > >
> > > I am +1 on the overall approach of this KIP, but have a couple of
> > comments
> > > (sorry, should have brought them up on the discuss thread earlier):
> > >
> > > 1. Perhaps it would be better to do this after KAFKA-4137
> > >  is implemented? At
> > the
> > > moment, coordinator shares the same NetworkClient (and hence the same
> > > Selector) with consumer connections used for fetching records. Since
> > > freeing of memory relies on consuming applications invoking poll()
> after
> > > processing previous records and potentially after committing offsets,
> it
> > > will be good to ensure that coordinator is not blocked for read by
> fetch
> > > responses. This may be simpler once coordinator has its own Selector.
> > >
> > > 2. The KIP says: *Once messages are returned to the user, messages are
> > > deleted from the MemoryPool so new messages can be stored.*
> > > Can you expand that a bit? I am assuming that partial buffers never get
> > > freed when some messages are returned to the user since the consumer is
> > > still holding a reference to the buffer. Would buffers be freed when
> > > fetches for all the partitions in a response are parsed, but perhaps
> not
> > > yet returned to the user (i.e., is the memory freed when a reference to
> > the
> > > response buffer is no longer required)? It will be good to document the
> > > (approximate) maximum memory requirement for the non-compressed case.
> > There
> > > is data read from the socket, cached in the Fetcher and (as Radai has
> > > pointed out), the records still with the user application.
> > >
> > >
> > > On Tue, Dec 6, 2016 at 2:04 AM, radai 
> > wrote:
> > >
> > >> +1 (non-binding).
> > >>
> > >> small nit pick - just because you returned a response to user doesnt
> > mean
> > >> the memory id no longer used. for some cases the actual "point of
> > >> termination" may be the deserializer (really impl-dependant), but
> > >> generally, wouldnt it be "nice" to have an explicit dispose() call on
> > >> responses (with the addition that getting the next batch of data from
> a
> > >> consumer automatically disposes the previous results)
> > >>
> > >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar 
> > wrote:
> > >>
> > >> > +1 (non binding)
> > >> > --
> > >> > Edoardo Comar
> > >> > IBM MessageHub
> > >> > eco...@uk.ibm.com
> > >> > IBM UK Ltd, Hursley Park, SO21 2JN
> > >> >
> > >> > IBM United Kingdom Limited Registered in England and Wales with
> number
> > >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth,
> Hants.
> > >> PO6
> > >> > 3AU
> > >> >
> > >> >
> > >> >
> > >> > From:   Mickael Maison 
> > >> > To: dev@kafka.apache.org
> > >> > Date:   05/12/2016 14:38
> > >> > Subject:[VOTE] KIP-81: Bound Fetch memory usage in the
> > consumer
> > >> >
> > >> >
> > >> >
> > >> > Hi all,
> > >> >
> > >> > I'd like to start the vote for KIP-81:
> > >> > 

[GitHub] kafka pull request #2426: MINOR: Add offset information to consumer_test err...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2426


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2401: KAFKA-4671: Fix Streams window retention policy

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2401


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-107 Add purgeDataBefore() API in AdminClient

2017-01-23 Thread Dong Lin
Hi all,

When I am implementing the patch, I realized that the current usage of
"low_watermark" is a bit confusing. So I made the following interface
changes in the KIP:

- The newly added checkpoint file will be named log-begin-offset-checkpoint
- Replace low_watermark with log_begin_offset in FetchRequestPartition and
FetchResponsePartitionHeader

The problem with the previous naming conversion is that, low_watermark
implies minimum log begin offset of all replicas (similar to high
watermark) and we return this value in the PurgeResponse. In other words,
low_watermark can not be incremented if a follower is not live. Therefore
we can not use low_watermark in the checkpoint file or in the FetchResponse
from leader to followers if we want to persists the offset-to-purge
received from user across broker rebounce.

You can find the changes in KIP here
.
Please let me know if you have any concern with this change.

Thanks,
Dong

On Mon, Jan 23, 2017 at 11:20 AM, Dong Lin  wrote:

> Thanks for the comment Jun.
>
> Yeah, I think there is use-case where this can be useful. Allowing for
> asynchronous delete will be useful if an application doesn't need strong
> guarantee of purgeDataFrom(), e.g. if it is done to help reduce disk usage
> of kafka. The application may want to purge data for every time it does
> auto-commit without wait for future object to complete. On the other hand,
> synchronous delete will be useful if an application wants to make sure that
> the sensitive or bad data is definitely deleted. I think returning a future
> makes both choice available to user and it doesn't complicate
> implementation much.
>
>
> On Mon, Jan 23, 2017 at 10:45 AM, Jun Rao  wrote:
>
>> I feel that it's simpler to just keep the format of the checkpoint file as
>> it is and just add a separate checkpoint for low watermark. Low watermark
>> and high watermark are maintained independently. So, not sure if there is
>> significant benefit of storing them together.
>>
>> Looking at the KIP again. I actually have another question on the api. Is
>> there any benefit of returning a Future in the purgeDataBefore() api?
>> Since
>> admin apis are used infrequently, it seems that it's simpler to just have
>> a
>> blocking api and returns Map?
>>
>> Thanks,
>>
>> Jun
>>
>> On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin  wrote:
>>
>> > Thanks for the comment Guozhang. Please don't worry about being late. I
>> > would like to update the KIP if there is clear benefit of the new
>> approach.
>> > I am wondering if there is any use-case or operation aspects that would
>> > benefit from the new approach.
>> >
>> > I am not saying that these checkpoint files have the same priority. I
>> > mentioned other checkpoint files to suggest that it is OK to add one
>> more
>> > checkpoint file. To me three checkpoint files is not much different from
>> > four checkpoint files. I am just inclined to not update the KIP if the
>> only
>> > benefit is to avoid addition of a new checkpoint file.
>> >
>> >
>> >
>> > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang 
>> wrote:
>> >
>> > > To me the distinction between recovery-checkpoint and
>> > > replication-checkpoint are different from the distinction between
>> these
>> > two
>> > > hw checkpoint values: when broker starts up and act as the leader for
>> a
>> > > partition, it can live without seeing the recovery checkpoint, but
>> just
>> > > cannot rely on the existing last log segment and need to fetch from
>> other
>> > > replicas; but if the replication-checkpoint file is missing, it is a
>> > > correctness issue, as it does not know from where to truncate its
>> data,
>> > and
>> > > also how to respond to a fetch request. That is why I think we can
>> > separate
>> > > these two types of files, since the latter one is more important than
>> the
>> > > previous one.
>> > >
>> > > That being said, I do not want to recall another vote on this since
>> it is
>> > > my bad not responding before the vote is called. Just wanted to point
>> out
>> > > for the record that this approach may have some operational scenarios
>> > where
>> > > one of the replication files is missing and we need to treat them
>> > > specifically.
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin 
>> wrote:
>> > >
>> > > > Yeah, your solution of adding new APIs certainly works and I don't
>> > think
>> > > > that is an issue. On the other hand I don't think it is an issue to
>> > add a
>> > > > new checkpoint file as well since we already have multiple
>> checkpoint
>> > > > files. The benefit of the new approach you mentioned is probably
>> not an
>> > > > issue in the current approach since high watermark and low watermark
>> > > works
>> > > > completely 

[GitHub] kafka pull request #2427: KAFKA-4673: Fix thread-safety of Python Verifiable...

2017-01-23 Thread ewencp
GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/2427

KAFKA-4673: Fix thread-safety of Python VerifiableConsumer class



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka 
kafka-4673-verifiable-consumer-thread-safety

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2427


commit bb31bc597a0ae46dbed27ed73432aaaf5c005430
Author: Ewen Cheslack-Postava 
Date:   2017-01-24T00:03:13Z

KAFKA-4673: Fix thread-safety of Python VerifiableConsumer class




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2017-01-23 Thread Ismael Juma
Hi Roger,

SCRAM uses the PBKDF2 mechanism, here's a comparison between PBKDF2 and
bcrypt:

http://security.stackexchange.com/questions/4781/do-any-secu
rity-experts-recommend-bcrypt-for-password-storage/6415#6415

It may be worth supporting bcrypt, but not sure it would make sense to do
it in the context of SCRAM.

A minor correction: the KIP includes SCRAM-SHA-256 and SCRAM-SHA-512 (not
SCRAM-SHA-1).

Ismael

On Mon, Jan 23, 2017 at 10:49 PM, Roger Hoover 
wrote:

> Sorry for the late question but is there a reason to choose SHA-1 and
> SHA-256 instead of bcrypt?
>
> https://codahale.com/how-to-safely-store-a-password/
>
> On Fri, Nov 11, 2016 at 5:30 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > I think all the comments and suggestions on this thread have now been
> > incorporated into the KIP. If there are no objections, I will start the
> > voting process on Monday.
> >
> > Regards,
> >
> > Rajini
> >
> > On Tue, Nov 8, 2016 at 9:20 PM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > Jun,
> > >
> > > Have added a sub-section on delegation token support to the KIP.
> > >
> > > Thank you,
> > >
> > > Rajini
> > >
> > > On Tue, Nov 8, 2016 at 8:07 PM, Jun Rao  wrote:
> > >
> > >> Hi, Rajini,
> > >>
> > >> That makes sense. Could you document this potential future extension
> in
> > >> the
> > >> KIP?
> > >>
> > >> Jun
> > >>
> > >> On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
> > >> rajinisiva...@googlemail.com> wrote:
> > >>
> > >> > Jun,
> > >> >
> > >> > 11. SCRAM messages have an optional extensions field which is a list
> > of
> > >> > key=value pairs. We can add an extension key to the first client
> > >> message to
> > >> > indicate delegation token. Broker can then obtain credentials and
> > >> principal
> > >> > using a different code path for delegation tokens.
> > >> >
> > >> > On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:
> > >> >
> > >> > > Magnus,
> > >> > >
> > >> > > Thanks for the input. If you don't feel strongly the need to bump
> up
> > >> the
> > >> > > version of SaslHandshake, we can leave the version unchanged.
> > >> > >
> > >> > > Rajini,
> > >> > >
> > >> > > 11. Yes, we could send the HMAC as the SCRAM password for the
> > >> delegation
> > >> > > token. Do we need something to indicate that this SCRAM token is
> > >> special
> > >> > > (i.e., delegation token) so that we can generate the correct
> > >> > > KafkaPrincipal? The delegation token logic can be added later. I
> am
> > >> > asking
> > >> > > just so that we have enough in the design of SCRAM to add the
> > >> delegation
> > >> > > token logic later.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> > >> > > rajinisiva...@googlemail.com
> > >> > > > wrote:
> > >> > >
> > >> > > > Hi Jun,
> > >> > > >
> > >> > > > 10. *s=* and *i=* come from the SCRAM standard
> > >> (they
> > >> > > are
> > >> > > > transferred during SCRAM auth). Scram messages look like (for
> > >> example)
> > >> > > > *r=,s=,i=*. StoredKey and ServerKey
> and
> > >> not
> > >> > > > transferred in SCRAM messages, so I picked two keys that are
> > unused
> > >> in
> > >> > > > SCRAM.
> > >> > > >
> > >> > > > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared
> secret/password
> > >> for
> > >> > > > authentication along with a username and an optional
> > >> authorization-id.
> > >> > > > Kafka uses the username as the identity (Kafka principal) for
> > >> > > > authentication and authorization. KIP-48 doesn't mention
> > >> KafkaPrincipal
> > >> > > in
> > >> > > > the section "Authentication using Token", but a delegation token
> > is
> > >> > > > associated with a Kafka principal. Since delegation tokens are
> > >> acquired
> > >> > > on
> > >> > > > behalf of a KafkaPrincipal and the principal is included in the
> > >> token
> > >> > as
> > >> > > > the token owner,  clients authenticating with delegation tokens
> > >> could
> > >> > use
> > >> > > > the token owner as username and the token HMAC as shared
> > >> > secret/password.
> > >> > > >
> > >> > > > If necessary, any other form of token identifier may be used as
> > >> > username
> > >> > > as
> > >> > > > well as long as it contains sufficient information for the
> broker
> > to
> > >> > > > retrieve/compute the principal and HMAC for authentication. The
> > >> server
> > >> > > > callback handler can be updated when delegation tokens are
> > >> implemented
> > >> > to
> > >> > > > generate Kafka principal accordingly.
> > >> > > >
> > >> > > >
> > >> > > > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao 
> wrote:
> > >> > > >
> > >> > > > > Hi, Rajini,
> > >> > > > >
> > >> > > > > A couple of other questions on the KIP.
> > >> > > > >
> > >> > > > > 10. For the config values stored in ZK, are those keys (s, t,
> k,
> > >> i,
> > >> > > etc)
> > >> > > > > stored under 

[GitHub] kafka pull request #2426: MINOR: Add offset information to consumer_test err...

2017-01-23 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2426

MINOR: Add offset information to consumer_test error messages



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka 
improve-consumer-test-error-messages

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2426


commit b967c62975ad3a4c487cc57a786a538af121aa78
Author: Jason Gustafson 
Date:   2017-01-23T23:35:31Z

MINOR: Add offset information to consumer_test error messages




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2017-01-23 Thread Roger Hoover
Sorry for the late question but is there a reason to choose SHA-1 and
SHA-256 instead of bcrypt?

https://codahale.com/how-to-safely-store-a-password/

On Fri, Nov 11, 2016 at 5:30 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> I think all the comments and suggestions on this thread have now been
> incorporated into the KIP. If there are no objections, I will start the
> voting process on Monday.
>
> Regards,
>
> Rajini
>
> On Tue, Nov 8, 2016 at 9:20 PM, Rajini Sivaram <
> rajinisiva...@googlemail.com
> > wrote:
>
> > Jun,
> >
> > Have added a sub-section on delegation token support to the KIP.
> >
> > Thank you,
> >
> > Rajini
> >
> > On Tue, Nov 8, 2016 at 8:07 PM, Jun Rao  wrote:
> >
> >> Hi, Rajini,
> >>
> >> That makes sense. Could you document this potential future extension in
> >> the
> >> KIP?
> >>
> >> Jun
> >>
> >> On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
> >> rajinisiva...@googlemail.com> wrote:
> >>
> >> > Jun,
> >> >
> >> > 11. SCRAM messages have an optional extensions field which is a list
> of
> >> > key=value pairs. We can add an extension key to the first client
> >> message to
> >> > indicate delegation token. Broker can then obtain credentials and
> >> principal
> >> > using a different code path for delegation tokens.
> >> >
> >> > On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:
> >> >
> >> > > Magnus,
> >> > >
> >> > > Thanks for the input. If you don't feel strongly the need to bump up
> >> the
> >> > > version of SaslHandshake, we can leave the version unchanged.
> >> > >
> >> > > Rajini,
> >> > >
> >> > > 11. Yes, we could send the HMAC as the SCRAM password for the
> >> delegation
> >> > > token. Do we need something to indicate that this SCRAM token is
> >> special
> >> > > (i.e., delegation token) so that we can generate the correct
> >> > > KafkaPrincipal? The delegation token logic can be added later. I am
> >> > asking
> >> > > just so that we have enough in the design of SCRAM to add the
> >> delegation
> >> > > token logic later.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> >> > > rajinisiva...@googlemail.com
> >> > > > wrote:
> >> > >
> >> > > > Hi Jun,
> >> > > >
> >> > > > 10. *s=* and *i=* come from the SCRAM standard
> >> (they
> >> > > are
> >> > > > transferred during SCRAM auth). Scram messages look like (for
> >> example)
> >> > > > *r=,s=,i=*. StoredKey and ServerKey and
> >> not
> >> > > > transferred in SCRAM messages, so I picked two keys that are
> unused
> >> in
> >> > > > SCRAM.
> >> > > >
> >> > > > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password
> >> for
> >> > > > authentication along with a username and an optional
> >> authorization-id.
> >> > > > Kafka uses the username as the identity (Kafka principal) for
> >> > > > authentication and authorization. KIP-48 doesn't mention
> >> KafkaPrincipal
> >> > > in
> >> > > > the section "Authentication using Token", but a delegation token
> is
> >> > > > associated with a Kafka principal. Since delegation tokens are
> >> acquired
> >> > > on
> >> > > > behalf of a KafkaPrincipal and the principal is included in the
> >> token
> >> > as
> >> > > > the token owner,  clients authenticating with delegation tokens
> >> could
> >> > use
> >> > > > the token owner as username and the token HMAC as shared
> >> > secret/password.
> >> > > >
> >> > > > If necessary, any other form of token identifier may be used as
> >> > username
> >> > > as
> >> > > > well as long as it contains sufficient information for the broker
> to
> >> > > > retrieve/compute the principal and HMAC for authentication. The
> >> server
> >> > > > callback handler can be updated when delegation tokens are
> >> implemented
> >> > to
> >> > > > generate Kafka principal accordingly.
> >> > > >
> >> > > >
> >> > > > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
> >> > > >
> >> > > > > Hi, Rajini,
> >> > > > >
> >> > > > > A couple of other questions on the KIP.
> >> > > > >
> >> > > > > 10. For the config values stored in ZK, are those keys (s, t, k,
> >> i,
> >> > > etc)
> >> > > > > stored under scram-sha-256 standard?
> >> > > > >
> >> > > > > 11. Could KIP-48 (delegation token) use this KIP to send
> >> delegation
> >> > > > tokens?
> >> > > > > In KIP-48, the client sends a HMAC as the delegation token to
> the
> >> > > server.
> >> > > > > Not sure how this gets mapped to the username/password in this
> >> KIP.
> >> > > > >
> >> > > > > Thanks,
> >> > > > >
> >> > > > > Jun
> >> > > > >
> >> > > > > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> >> > > > > rajinisiva...@googlemail.com
> >> > > > > > wrote:
> >> > > > >
> >> > > > > > Hi all,
> >> > > > > >
> >> > > > > > I have just created KIP-84 to add SCRAM-SHA-1 and
> SCRAM-SHA-256
> >> > SASL
> >> > > > > > mechanisms to Kafka:
> >> > > > > >
> >> > > > > > 

[GitHub] kafka pull request #2425: KAFKA-4687: Fix InvalidTopicException due to topic...

2017-01-23 Thread noslowerdna
GitHub user noslowerdna opened a pull request:

https://github.com/apache/kafka/pull/2425

KAFKA-4687: Fix InvalidTopicException due to topic creation race condition

Pull request for https://issues.apache.org/jira/browse/KAFKA-4687, complete 
details can be found in the Jira.

With this change, instead of incorrectly throwing an InvalidTopicException, 
a TopicExistsException would be thrown 
[here](https://github.com/apache/kafka/blob/0.10.1.1/core/src/main/scala/kafka/admin/AdminUtils.scala#L474).

Note that an alternative (or possibly additional) change could be to 
proactively check if the retrieved topic inventory 
[here](https://github.com/apache/kafka/blob/0.10.1.1/core/src/main/scala/kafka/admin/AdminUtils.scala#L432)
 contains the topic being created, and throw a TopicExistsException at that 
point. Let me know if that would be preferred, and I'll update the code 
accordingly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/noslowerdna/kafka KAFKA-4687

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2425


commit 8d5ab380868d2be72992701174edf9ab26928ca5
Author: Andrew Olson 
Date:   2017-01-23T22:40:06Z

KAFKA-4687: Fix InvalidTopicException due to topic creation race condition




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-74: Add FetchResponse size limit in bytes

2017-01-23 Thread Colin McCabe
Thanks, all.  I edited the wiki to reflect the implemented behavior by
dropping references to special behavior when max_bytes was INT_MAX.

cheers,
Colin


On Sat, Jan 21, 2017, at 09:44, radai wrote:
> +1
> 
> On Fri, Jan 20, 2017 at 9:51 PM, Apurva Mehta 
> wrote:
> 
> > +1
> >
> > On Fri, Jan 20, 2017 at 5:19 PM, Jason Gustafson 
> > wrote:
> >
> > > +1
> > >
> > > On Fri, Jan 20, 2017 at 4:51 PM, Ismael Juma  wrote:
> > >
> > > > Good catch, Colin. +1 to editing the wiki to match the desired
> > behaviour
> > > > and what was implemented in 0.10.1.
> > > >
> > > > Ismael
> > > >
> > > > On Sat, Jan 21, 2017 at 12:19 AM, Colin McCabe 
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > While looking at some code related to KIP-74, I noticed a slight
> > > > > discrepancy between the text on the wiki and the implementation.  The
> > > > > wiki says that "If max_bytes is Int.MAX_INT, new request behaves
> > > exactly
> > > > > like old one."  This would mean that if there was a single message
> > that
> > > > > was larger than the maximum bytes per partition, zero messages would
> > be
> > > > > returned, and clients would throw MessageSizeTooLargeException.
> > > > > However, the code does not implement this.  Instead, it implements
> > the
> > > > > "new" behavior where the client always gets at least one message.
> > > > >
> > > > > The new behavior seems to be more desirable, since clients do not
> > "get
> > > > > stuck" on messages that are too big.  I propose that we edit the wiki
> > > to
> > > > > reflect the implemented behavior by deleting the references to
> > special
> > > > > behavior when max_bytes is MAX_INT.
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > > >
> > >
> >


Build failed in Jenkins: kafka-trunk-jdk8 #1205

2017-01-23 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-4060 Follow-up: update docs accordingly

[jason] KAFKA-4633; Always using regex pattern subscription in StreamThread

--
[...truncated 8401 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > 

Jenkins build is back to normal : kafka-trunk-jdk7 #1866

2017-01-23 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #2424: KAFKA-4688: The Docker image should contain versio...

2017-01-23 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2424

KAFKA-4688: The Docker image should contain version 0.10.1.0 of Kafka



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4688

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2424


commit def83703c32fff3b18b60dde4acc2041b5178003
Author: Colin P. Mccabe 
Date:   2017-01-23T20:54:49Z

KAFKA-4688: The Docker image should contain version 0.10.1.0 of Kafka




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Request to provide rights to assign JIRA tickets to myself

2017-01-23 Thread Jason Gustafson
Done. You should be able to assign yourself the JIRA now.

-Jason

On Mon, Jan 23, 2017 at 2:27 AM, Akhilesh Naidu <
akhilesh_na...@persistent.com> wrote:

> Hi,
>
>
> I would be interested to contribute to the Kafka project,
>
> and was presently looking into the below issue
>
> KAFKA-4566
> Can't Symlink to Kafka bins
>
>
> Could some one please provide the rights to assign JIRA tickets to myself.
>
>
>
> Regards
>
> Akhilesh
>
>
>
>
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>
>


Build failed in Jenkins: kafka-trunk-jdk7 #1865

2017-01-23 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: KAFKA-4060 and KAFKA-4476 follow up

[wangguoz] KAFKA-4060 Follow-up: update docs accordingly

--
[...truncated 8392 lines...]

kafka.log.LogTest > testReadWithTooSmallMaxLength STARTED

kafka.log.LogTest > testReadWithTooSmallMaxLength PASSED

kafka.log.LogTest > testOverCompactedLogRecovery STARTED

kafka.log.LogTest > testOverCompactedLogRecovery PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved STARTED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testCompressedMessages STARTED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload STARTED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog STARTED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset STARTED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testReopenThenTruncate STARTED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName STARTED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles STARTED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages STARTED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages PASSED

kafka.log.LogTest > testSizeBasedLogRoll STARTED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
STARTED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter STARTED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testParseTopicPartitionName STARTED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testTruncateTo STARTED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile STARTED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets STARTED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > 

Build failed in Jenkins: kafka-trunk-jdk8 #1204

2017-01-23 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: Improve scaladoc for Authorizer

[wangguoz] HOTFIX: KAFKA-4060 and KAFKA-4476 follow up

--
[...truncated 28498 lines...]
org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
generateCredential PASSED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
extraneousFields STARTED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
extraneousFields PASSED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
scramCredentialCache STARTED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
scramCredentialCache PASSED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
invalidCredential STARTED

org.apache.kafka.common.security.scram.ScramCredentialUtilsTest > 
invalidCredential PASSED

org.apache.kafka.common.security.scram.ScramFormatterTest > rfc7677Example 
STARTED

org.apache.kafka.common.security.scram.ScramFormatterTest > rfc7677Example 
PASSED

org.apache.kafka.common.security.scram.ScramFormatterTest > saslName STARTED

org.apache.kafka.common.security.scram.ScramFormatterTest > saslName PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse STARTED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.JaasUtilsTest > testMissingOptionValue STARTED

org.apache.kafka.common.security.JaasUtilsTest > testMissingOptionValue PASSED

org.apache.kafka.common.security.JaasUtilsTest > testSingleOption STARTED

org.apache.kafka.common.security.JaasUtilsTest > testSingleOption PASSED

org.apache.kafka.common.security.JaasUtilsTest > testNumericOptionWithoutQuotes 
STARTED

org.apache.kafka.common.security.JaasUtilsTest > testNumericOptionWithoutQuotes 
PASSED

org.apache.kafka.common.security.JaasUtilsTest > testConfigNoOptions STARTED

org.apache.kafka.common.security.JaasUtilsTest > testConfigNoOptions PASSED

org.apache.kafka.common.security.JaasUtilsTest > testNumericOptionWithQuotes 
STARTED

org.apache.kafka.common.security.JaasUtilsTest > testNumericOptionWithQuotes 
PASSED

org.apache.kafka.common.security.JaasUtilsTest > testQuotedOptionValue STARTED

org.apache.kafka.common.security.JaasUtilsTest > testQuotedOptionValue PASSED

org.apache.kafka.common.security.JaasUtilsTest > testMissingLoginModule STARTED

org.apache.kafka.common.security.JaasUtilsTest > testMissingLoginModule PASSED

org.apache.kafka.common.security.JaasUtilsTest > testMissingSemicolon STARTED

org.apache.kafka.common.security.JaasUtilsTest > testMissingSemicolon PASSED

org.apache.kafka.common.security.JaasUtilsTest > testMultipleOptions STARTED


Re: [VOTE] KIP-107 Add purgeDataBefore() API in AdminClient

2017-01-23 Thread Dong Lin
Thanks for the comment Jun.

Yeah, I think there is use-case where this can be useful. Allowing for
asynchronous delete will be useful if an application doesn't need strong
guarantee of purgeDataFrom(), e.g. if it is done to help reduce disk usage
of kafka. The application may want to purge data for every time it does
auto-commit without wait for future object to complete. On the other hand,
synchronous delete will be useful if an application wants to make sure that
the sensitive or bad data is definitely deleted. I think returning a future
makes both choice available to user and it doesn't complicate
implementation much.


On Mon, Jan 23, 2017 at 10:45 AM, Jun Rao  wrote:

> I feel that it's simpler to just keep the format of the checkpoint file as
> it is and just add a separate checkpoint for low watermark. Low watermark
> and high watermark are maintained independently. So, not sure if there is
> significant benefit of storing them together.
>
> Looking at the KIP again. I actually have another question on the api. Is
> there any benefit of returning a Future in the purgeDataBefore() api? Since
> admin apis are used infrequently, it seems that it's simpler to just have a
> blocking api and returns Map?
>
> Thanks,
>
> Jun
>
> On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin  wrote:
>
> > Thanks for the comment Guozhang. Please don't worry about being late. I
> > would like to update the KIP if there is clear benefit of the new
> approach.
> > I am wondering if there is any use-case or operation aspects that would
> > benefit from the new approach.
> >
> > I am not saying that these checkpoint files have the same priority. I
> > mentioned other checkpoint files to suggest that it is OK to add one more
> > checkpoint file. To me three checkpoint files is not much different from
> > four checkpoint files. I am just inclined to not update the KIP if the
> only
> > benefit is to avoid addition of a new checkpoint file.
> >
> >
> >
> > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang 
> wrote:
> >
> > > To me the distinction between recovery-checkpoint and
> > > replication-checkpoint are different from the distinction between these
> > two
> > > hw checkpoint values: when broker starts up and act as the leader for a
> > > partition, it can live without seeing the recovery checkpoint, but just
> > > cannot rely on the existing last log segment and need to fetch from
> other
> > > replicas; but if the replication-checkpoint file is missing, it is a
> > > correctness issue, as it does not know from where to truncate its data,
> > and
> > > also how to respond to a fetch request. That is why I think we can
> > separate
> > > these two types of files, since the latter one is more important than
> the
> > > previous one.
> > >
> > > That being said, I do not want to recall another vote on this since it
> is
> > > my bad not responding before the vote is called. Just wanted to point
> out
> > > for the record that this approach may have some operational scenarios
> > where
> > > one of the replication files is missing and we need to treat them
> > > specifically.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin  wrote:
> > >
> > > > Yeah, your solution of adding new APIs certainly works and I don't
> > think
> > > > that is an issue. On the other hand I don't think it is an issue to
> > add a
> > > > new checkpoint file as well since we already have multiple checkpoint
> > > > files. The benefit of the new approach you mentioned is probably not
> an
> > > > issue in the current approach since high watermark and low watermark
> > > works
> > > > completely independently. Since there is no strong reason to choose
> > > either
> > > > of them, I am inclined to choose the one that makes less format
> change
> > > and
> > > > simpler in the Java API. The current approach seems better w.r.t this
> > > minor
> > > > reason.
> > > >
> > > > If you are strong that we should use the new approach, I can do that
> as
> > > > well. Please let me know if you think so, and I will need to ask
> > > > Jun/Joel/Becket to vote on this again since this changes the
> interface
> > of
> > > > the KIP.
> > > >
> > > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > I think this is less of an issue: we can use the same patterns as
> in
> > > the
> > > > > request protocol, i.e.:
> > > > >
> > > > > write(Map[TP, Long]) // write the checkout point in v0 format
> > > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1
> > > format
> > > > >
> > > > > CheckpointedOffsets read() // read the file relying on its version
> id
> > > > >
> > > > > class CheckpointedOffsets {
> > > > >
> > > > > Integer getVersion();
> > > > > Long getFirstOffset();
> > > > > Long getSecondOffset();   // would return NO_AVAILABLE with v0
> > 

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-23 Thread Dong Lin
Thanks. Please see my comment inline.

On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
wrote:

>
>
> 13.01.2017, 22:29, "Dong Lin" :
> > Hey Alexey,
> >
> > Thanks for your review and the alternative approach. Here is my
> > understanding of your patch. kafka's background threads are used to move
> > data between replicas. When data movement is triggered, the log will be
> > rolled and the new logs will be put in the new directory, and background
> > threads will move segment from old directory to new directory.
> >
> > It is important to note that KIP-112 is intended to work with KIP-113 to
> > support JBOD. I think your solution is definitely simpler and better
> under
> > the current kafka implementation that a broker will fail if any disk
> fails.
> > But I am not sure if we want to allow broker to run with partial disks
> > failure. Let's say the a replica is being moved from log_dir_old to
> > log_dir_new and then log_dir_old stops working due to disk failure. How
> > would your existing patch handles it? To make the scenario a bit more
>
> We will lose log_dir_old. After broker restart we can read the data from
> log_dir_new.
>

No, you probably can't. This is because the broker doesn't have *all* the
data for this partition. For example, say the broker has
partition_segement_1, partition_segment_50 and partition_segment_100 on the
log_dir_old. partition_segment_100, which has the latest data, has been
moved to log_dir_new, and the log_dir_old fails before partition_segment_50
and partition_segment_1 is moved to log_dir_new. When broker re-starts, it
won't have partition_segment_50. This causes problem if broker is elected
leader and consumer wants to consume data in the partition_segment_1.


>
> > complicated, let's say the broker is shtudown, log_dir_old's disk fails,
> > and the broker starts. In this case broker doesn't even know if
> log_dir_new
> > has all the data needed for this replica. It becomes a problem if the
> > broker is elected leader of this partition in this case.
>
> log_dir_new contains the most recent data so we will lose the tail of
> partition.
> This is not a big problem for us because we already delete tails by hand
> (see https://issues.apache.org/jira/browse/KAFKA-1712).
> Also we dont use authomatic leader balancing 
> (auto.leader.rebalance.enable=false),
> so this partition becomes the leader with a low probability.
> I think my patch can be modified to prohibit the selection of the leader
> until the partition does not move completely.
>

I guess you are saying that you have deleted the tails by hand in your own
kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am not
sure if it is the right solution. How would this solution address the
problem mentioned above?

BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way to
address its problem. Now that we have timestamp in the message we can use
that to delete old segement instead of relying on the log segment mtime.
Just some idea and we don't have to discuss this problem here.


>
> >
> > The solution presented in the KIP attempts to handle it by replacing
> > replica in an atomic version fashion after the log in the new dir has
> fully
> > caught up with the log in the old dir. At at time the log can be
> considered
> > to exist on only one log directory.
>
> As I understand your solution does not cover quotas.
> What happens if someone starts to transfer 100 partitions ?
>

Good point. Quota can be implemented in the future. It is currently
mentioned as as a potential future improvement in KIP-112
.Thanks
for the reminder. I will move it to KIP-113.


>
> > If yes, it will read a ByteBufferMessageSet from topicPartition.log and
> append the message set to topicPartition.move
>
> i.e. processPartitionData will read data from the beginning of
> topicPartition.log? What is the read size?
> A ReplicaFetchThread reads many partitions so if one does some complicated
> work (= read a lot of data from disk) everything will slow down.
> I think read size should not be very big.
>
> On the other hand at this point (processPartitionData) one can use only
> the new data (ByteBufferMessageSet  from parameters) and wait until
> (topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset
> && topicPartition.log.largestOffset == topicPartition.log.largestOffset).
> In this case the write speed to topicPartition.move and topicPartition.log
> will be the same so this will allow us to move many partitions to one disk.
>
>
The read size of a given partition is configured
using replica.fetch.max.bytes, which is the same size used by FetchRequest
from follower to leader. If the broker is moving a replica for which it
acts as a follower, the disk write rate for moving this replica is at most
the rate it fetches from leader (assume it is catching up and has

[GitHub] kafka pull request #2379: KAFKA-4633: Always using regex pattern subscriptio...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2379


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-107 Add purgeDataBefore() API in AdminClient

2017-01-23 Thread Jun Rao
I feel that it's simpler to just keep the format of the checkpoint file as
it is and just add a separate checkpoint for low watermark. Low watermark
and high watermark are maintained independently. So, not sure if there is
significant benefit of storing them together.

Looking at the KIP again. I actually have another question on the api. Is
there any benefit of returning a Future in the purgeDataBefore() api? Since
admin apis are used infrequently, it seems that it's simpler to just have a
blocking api and returns Map?

Thanks,

Jun

On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin  wrote:

> Thanks for the comment Guozhang. Please don't worry about being late. I
> would like to update the KIP if there is clear benefit of the new approach.
> I am wondering if there is any use-case or operation aspects that would
> benefit from the new approach.
>
> I am not saying that these checkpoint files have the same priority. I
> mentioned other checkpoint files to suggest that it is OK to add one more
> checkpoint file. To me three checkpoint files is not much different from
> four checkpoint files. I am just inclined to not update the KIP if the only
> benefit is to avoid addition of a new checkpoint file.
>
>
>
> On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang  wrote:
>
> > To me the distinction between recovery-checkpoint and
> > replication-checkpoint are different from the distinction between these
> two
> > hw checkpoint values: when broker starts up and act as the leader for a
> > partition, it can live without seeing the recovery checkpoint, but just
> > cannot rely on the existing last log segment and need to fetch from other
> > replicas; but if the replication-checkpoint file is missing, it is a
> > correctness issue, as it does not know from where to truncate its data,
> and
> > also how to respond to a fetch request. That is why I think we can
> separate
> > these two types of files, since the latter one is more important than the
> > previous one.
> >
> > That being said, I do not want to recall another vote on this since it is
> > my bad not responding before the vote is called. Just wanted to point out
> > for the record that this approach may have some operational scenarios
> where
> > one of the replication files is missing and we need to treat them
> > specifically.
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin  wrote:
> >
> > > Yeah, your solution of adding new APIs certainly works and I don't
> think
> > > that is an issue. On the other hand I don't think it is an issue to
> add a
> > > new checkpoint file as well since we already have multiple checkpoint
> > > files. The benefit of the new approach you mentioned is probably not an
> > > issue in the current approach since high watermark and low watermark
> > works
> > > completely independently. Since there is no strong reason to choose
> > either
> > > of them, I am inclined to choose the one that makes less format change
> > and
> > > simpler in the Java API. The current approach seems better w.r.t this
> > minor
> > > reason.
> > >
> > > If you are strong that we should use the new approach, I can do that as
> > > well. Please let me know if you think so, and I will need to ask
> > > Jun/Joel/Becket to vote on this again since this changes the interface
> of
> > > the KIP.
> > >
> > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang 
> > wrote:
> > >
> > > > I think this is less of an issue: we can use the same patterns as in
> > the
> > > > request protocol, i.e.:
> > > >
> > > > write(Map[TP, Long]) // write the checkout point in v0 format
> > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1
> > format
> > > >
> > > > CheckpointedOffsets read() // read the file relying on its version id
> > > >
> > > > class CheckpointedOffsets {
> > > >
> > > > Integer getVersion();
> > > > Long getFirstOffset();
> > > > Long getSecondOffset();   // would return NO_AVAILABLE with v0
> > format
> > > > }
> > > >
> > > >
> > > > As I think of it, another benefit is that we wont have a partition
> that
> > > > only have one of the watermarks in case of a failure in between
> writing
> > > two
> > > > files.
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin 
> > wrote:
> > > >
> > > > > Hey Guozhang,
> > > > >
> > > > > Thanks for the review:) Yes it is possible to combine them. Both
> > > solution
> > > > > will have the same performance. But I think the current solution
> will
> > > > give
> > > > > us simpler Java class design. Note that we will have to change Java
> > API
> > > > > (e.g. read() and write()) of OffsetCheckpoint class in order to
> > > provide a
> > > > > map from TopicPartition to a pair of integers when we write to
> > > checkpoint
> > > > > file. This makes this class less generic since this API is not used
> > by
> 

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-01-23 Thread Onur Karaman
I only did a quick scan but I wanted to point out what I think is an
incorrect assumption in the KIP's caveats:
"
There is a risk using the MemoryPool that, after we fill up the memory with
fetch data, we can starve the coordinator's connection
...
To alleviate this issue, only messages larger than 1Kb will be allocated in
the MemoryPool. Smaller messages will be allocated directly on the Heap
like before. This allows group/heartbeat messages to avoid being delayed if
the MemoryPool fills up.
"

So it sounds like there's an incorrect assumption that responses from the
coordinator will always be small (< 1Kb as mentioned in the caveat). There
are now a handful of request types between clients and the coordinator:
{JoinGroup, SyncGroup, LeaveGroup, Heartbeat, OffsetCommit, OffsetFetch,
ListGroups, DescribeGroups}. While true (at least today) for
HeartbeatResponse and a few others, I don't think we can assume
JoinGroupResponse, SyncGroupResponse, DescribeGroupsResponse, and
OffsetFetchResponse will be small, as they are effectively bounded by the
max message size allowed by the broker for the __consumer_offsets topic
which by default is 1MB.

On Mon, Jan 23, 2017 at 9:46 AM, Mickael Maison 
wrote:

> I've updated the KIP to address all the comments raised here and from
> the "DISCUSS" thread.
> See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>
> Now, I'd like to restart the vote.
>
> On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
>  wrote:
> > Hi Mickael,
> >
> > I am +1 on the overall approach of this KIP, but have a couple of
> comments
> > (sorry, should have brought them up on the discuss thread earlier):
> >
> > 1. Perhaps it would be better to do this after KAFKA-4137
> >  is implemented? At
> the
> > moment, coordinator shares the same NetworkClient (and hence the same
> > Selector) with consumer connections used for fetching records. Since
> > freeing of memory relies on consuming applications invoking poll() after
> > processing previous records and potentially after committing offsets, it
> > will be good to ensure that coordinator is not blocked for read by fetch
> > responses. This may be simpler once coordinator has its own Selector.
> >
> > 2. The KIP says: *Once messages are returned to the user, messages are
> > deleted from the MemoryPool so new messages can be stored.*
> > Can you expand that a bit? I am assuming that partial buffers never get
> > freed when some messages are returned to the user since the consumer is
> > still holding a reference to the buffer. Would buffers be freed when
> > fetches for all the partitions in a response are parsed, but perhaps not
> > yet returned to the user (i.e., is the memory freed when a reference to
> the
> > response buffer is no longer required)? It will be good to document the
> > (approximate) maximum memory requirement for the non-compressed case.
> There
> > is data read from the socket, cached in the Fetcher and (as Radai has
> > pointed out), the records still with the user application.
> >
> >
> > On Tue, Dec 6, 2016 at 2:04 AM, radai 
> wrote:
> >
> >> +1 (non-binding).
> >>
> >> small nit pick - just because you returned a response to user doesnt
> mean
> >> the memory id no longer used. for some cases the actual "point of
> >> termination" may be the deserializer (really impl-dependant), but
> >> generally, wouldnt it be "nice" to have an explicit dispose() call on
> >> responses (with the addition that getting the next batch of data from a
> >> consumer automatically disposes the previous results)
> >>
> >> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar 
> wrote:
> >>
> >> > +1 (non binding)
> >> > --
> >> > Edoardo Comar
> >> > IBM MessageHub
> >> > eco...@uk.ibm.com
> >> > IBM UK Ltd, Hursley Park, SO21 2JN
> >> >
> >> > IBM United Kingdom Limited Registered in England and Wales with number
> >> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> >> PO6
> >> > 3AU
> >> >
> >> >
> >> >
> >> > From:   Mickael Maison 
> >> > To: dev@kafka.apache.org
> >> > Date:   05/12/2016 14:38
> >> > Subject:[VOTE] KIP-81: Bound Fetch memory usage in the
> consumer
> >> >
> >> >
> >> >
> >> > Hi all,
> >> >
> >> > I'd like to start the vote for KIP-81:
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> >> >
> >> >
> >> > Thank you
> >> >
> >> >
> >> >
> >> >
> >> > Unless stated otherwise above:
> >> > IBM United Kingdom Limited - Registered in England and Wales with
> number
> >> > 741598.
> >> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> >> 3AU
> >> >
> >>
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
>


[GitHub] kafka pull request #2377: Kafka 4060 docs update

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2377


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #1203

2017-01-23 Thread Apache Jenkins Server
See 

Changes:

[ismael] MINOR: Pass RecordingLevel to MetricConfig in the broker

--
[...truncated 20927 lines...]
org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > testCommitTimeout 
PASSED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume STARTED

org.apache.kafka.connect.runtime.WorkerSinkTaskThreadedTest > 
testAssignmentPauseResume PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignmentSingleTaskConnectors PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupFollower PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testMetadata PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment1 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testLeaderPerformAssignment2 PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testJoinLeaderCannotAssign PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testRejoinGroup PASSED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader STARTED

org.apache.kafka.connect.runtime.distributed.WorkerCoordinatorTest > 
testNormalJoinGroupLeader PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testPutConnectorConfig PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testAccessors PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedBasicValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testDestroyConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorAlreadyExists PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRestartTask PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testCreateConnectorFailedCustomValidation PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testJoinAssignment PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalance PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testRebalanceFailedConnector PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker STARTED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testHaltCleansUpWorker PASSED

org.apache.kafka.connect.runtime.distributed.DistributedHerderTest > 
testConnectorNameConflictsWithWorkerGroupId STARTED


Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-23 Thread Matthias J. Sax
I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:
> I'm not a fan of materialize. I think it interrupts the flow, i.e,
> 
> table.mapValue(..).materialize().join(..).materialize()
> compared to:
> table.mapValues(..).join(..)
> 
> I know which one i prefer.
> My preference is stil to provide overloaded methods where people can
> specify the store names if they want, otherwise we just generate them.
> 
> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax  wrote:
> 
>> Hi,
>>
>> thanks for the KIP Eno! Here are my 2 cents:
>>
>> 1) I like Guozhang's proposal about removing store name from all KTable
>> methods and generate internal names (however, I would do this as
>> overloads). Furthermore, I would not force users to call .materialize()
>> if they want to query a store, but add one more method .stateStoreName()
>> that returns the store name if the KTable is materialized. Thus, also
>> .materialize() must not necessarily have a parameter storeName (ie, we
>> should have some overloads here).
>>
>> I would also not allow to provide a null store name (to indicate no
>> materialization if not necessary) but throw an exception.
>>
>> This yields some simplification (see below).
>>
>>
>> 2) I also like Guozhang's proposal about KStream#toTable()
>>
>>
>> 3)
>>>
   3. What will happen when you call materialize on KTable that is
>> already
   materialized? Will it create another StateStore (providing the name is
   different), throw an Exception?
>>>
>>> Currently an exception is thrown, but see below.
>>>
>>>
>>
>> If we follow approach (1) from Guozhang, there is no need to worry about
>> a second materialization and also no exception must be throws. A call to
>> .materialize() basically sets a "materialized flag" (ie, idempotent
>> operation) and sets a new name.
>>
>>
>> 4)
 Rename toStream() to toKStream() for consistency.
>>>
>>> Not sure whether that is really required. We also use
>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, and
>>> don't care about the "K" prefix.
>>
>> Eno's reply:
>>> I think changing it to `toKStream` would make it absolutely clear what
>> we are converting it to.
>>>
>>> I'd say we should probably change the KStreamBuilder methods (but not in
>> this KIP).
>>
>> I would keep #toStream(). (see below)
>>
>>
>> 5) We should not remove any methods but only deprecate them.
>>
>>
>>
>> A general note:
>>
>> I do not understand your comments "Rejected Alternatives". You say "Have
>> the KTable be the materialized view" was rejected. But your KIP actually
>> does exactly this -- the changelog abstraction of KTable is secondary
>> after those changes and the "view" abstraction is what a KTable is. And
>> just to be clear, I like this a lot:
>>
>>  - it aligns with the name KTable
>>  - is aligns with stream-table-duality
>>  - it aligns with IQ
>>
>> I would say that a KTable is a "view abstraction" (as materialization is
>> optional).
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed
>>> comments:
>>>
>>> 1. I like the materialize() function in general, but I would like to see
>>> how other KTable functions should be updated accordingly. For example, 1)
>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>> always materialize the KTable unless its state store name is set to null;
>>> 2) KTable.agg requires the result KTable to be materialized, and hence it
>>> also have a state store name; 3) KTable.join requires the joining table
>> to
>>> be materialized. And today we do not actually have a mechanism to enforce
>>> that, but will only throw an exception at runtime if it is not (e.g. if
>> you
>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>
>>> I'd make an extended proposal just to kick off the discussion here: let's
>>> remove all the state store params in other KTable functions, and if in
>> some
>>> cases KTable have to be materialized (e.g. KTable resulted from KXX.agg)
>>> and users do not call materialize(), then we treat it as "users are not
>>> interested in querying it at all" and hence use an internal name
>> generated
>>> for the materialized KTable; i.e. although it is materialized the state
>>> store is not exposed to users. And if users call materialize() afterwards
>>> but we have already decided to materialize it, we can replace the
>> internal
>>> name with the user's provided names. Then from a user's point-view, if
>> they
>>> ever want to query a KTable, they have to call materialize() with a given
>>> state store name. This approach has one awkwardness though, that serdes

Re: [VOTE] KIP-81: Bound Fetch memory usage in the consumer

2017-01-23 Thread Mickael Maison
I've updated the KIP to address all the comments raised here and from
the "DISCUSS" thread.
See: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+Bound+Fetch+memory+usage+in+the+consumer

Now, I'd like to restart the vote.

On Tue, Dec 6, 2016 at 9:02 AM, Rajini Sivaram
 wrote:
> Hi Mickael,
>
> I am +1 on the overall approach of this KIP, but have a couple of comments
> (sorry, should have brought them up on the discuss thread earlier):
>
> 1. Perhaps it would be better to do this after KAFKA-4137
>  is implemented? At the
> moment, coordinator shares the same NetworkClient (and hence the same
> Selector) with consumer connections used for fetching records. Since
> freeing of memory relies on consuming applications invoking poll() after
> processing previous records and potentially after committing offsets, it
> will be good to ensure that coordinator is not blocked for read by fetch
> responses. This may be simpler once coordinator has its own Selector.
>
> 2. The KIP says: *Once messages are returned to the user, messages are
> deleted from the MemoryPool so new messages can be stored.*
> Can you expand that a bit? I am assuming that partial buffers never get
> freed when some messages are returned to the user since the consumer is
> still holding a reference to the buffer. Would buffers be freed when
> fetches for all the partitions in a response are parsed, but perhaps not
> yet returned to the user (i.e., is the memory freed when a reference to the
> response buffer is no longer required)? It will be good to document the
> (approximate) maximum memory requirement for the non-compressed case. There
> is data read from the socket, cached in the Fetcher and (as Radai has
> pointed out), the records still with the user application.
>
>
> On Tue, Dec 6, 2016 at 2:04 AM, radai  wrote:
>
>> +1 (non-binding).
>>
>> small nit pick - just because you returned a response to user doesnt mean
>> the memory id no longer used. for some cases the actual "point of
>> termination" may be the deserializer (really impl-dependant), but
>> generally, wouldnt it be "nice" to have an explicit dispose() call on
>> responses (with the addition that getting the next batch of data from a
>> consumer automatically disposes the previous results)
>>
>> On Mon, Dec 5, 2016 at 6:53 AM, Edoardo Comar  wrote:
>>
>> > +1 (non binding)
>> > --
>> > Edoardo Comar
>> > IBM MessageHub
>> > eco...@uk.ibm.com
>> > IBM UK Ltd, Hursley Park, SO21 2JN
>> >
>> > IBM United Kingdom Limited Registered in England and Wales with number
>> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
>> PO6
>> > 3AU
>> >
>> >
>> >
>> > From:   Mickael Maison 
>> > To: dev@kafka.apache.org
>> > Date:   05/12/2016 14:38
>> > Subject:[VOTE] KIP-81: Bound Fetch memory usage in the consumer
>> >
>> >
>> >
>> > Hi all,
>> >
>> > I'd like to start the vote for KIP-81:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 81%3A+Bound+Fetch+memory+usage+in+the+consumer
>> >
>> >
>> > Thank you
>> >
>> >
>> >
>> >
>> > Unless stated otherwise above:
>> > IBM United Kingdom Limited - Registered in England and Wales with number
>> > 741598.
>> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
>> 3AU
>> >
>>
>
>
>
> --
> Regards,
>
> Rajini


[GitHub] kafka pull request #2375: Kafka 4060 remove zk client dependency in kafka st...

2017-01-23 Thread hjafarpour
Github user hjafarpour closed the pull request at:

https://github.com/apache/kafka/pull/2375


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2418: HOTFIX: KAFKA-4060 and KAFKA-4476 follow up

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2418


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-111 : Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-01-23 Thread Ismael Juma
Hi Mayuresh,

Thanks for updating the KIP. A couple of questions:

1. PrincipalBuilder implements Configurable and gets a map of properties
via the `configure` method. Do we really need a new `buildPrincipal` method
given that?

2. Jun suggested in the JIRA that it may make sense to pass the
`channelPrincipal` as a field in `Session` instead of `KafkaPrincipal`. It
would be good to understand why this was rejected.

Ismael

On Thu, Jan 12, 2017 at 7:07 PM, Ismael Juma  wrote:

> Hi Mayuresh,
>
> Thanks for the KIP. A quick comment before I do a more detailed analysis,
> the KIP says:
>
> `This KIP is a pure addition to existing functionality and does not
> include any backward incompatible changes.`
>
> However, the KIP is proposing the addition of a method to the
> PrincipalBuilder pluggable interface, which is not a compatible change.
> Existing implementations would no longer compile, for example. It would be
> good to make this clear in the KIP.
>
> Ismael
>
> On Thu, Jan 12, 2017 at 5:44 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Hi all.
>>
>> We created KIP-111 to propose that Kafka should preserve the Principal
>> generated by the PrincipalBuilder while processing the request received on
>> socket channel, on the broker.
>>
>> Please find the KIP wiki in the link
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67638388
>> .
>> We would love to hear your comments and suggestions.
>>
>>
>> Thanks,
>>
>> Mayuresh
>>
>
>


SASL for ZK\Kafka

2017-01-23 Thread Shrikant Patel
Hi

I was trying to secure communication between ZK and Kafka. We generate the 
keytab file with principal

We were following this document - 
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
 (really detailed doc)

For Kafka - 
kafka/xx--xx.x@x.com

For ZK -zk//xx--xx.x@x.com 
  (our IT expert was running into 
issue creating principal as in link, because of AD has 20 character limit)

Since we running into issue, we enable SASL debug flag 
-Dsun.security.krb5.debug=true

And see below error, I don't have in-depth knowledge about SASL, so wanted to 
check with group to see if they faced this issue.

>>>KRBError:
 sTime is Wed Jan 18 09:46:12 CST 2017 1484754372000
 suSec is 434552
 error code is 24
 error Message is Pre-authentication information was invalid
 sname is krbtgt/x@x.com
 eData provided.
msgType is 30
>>>Pre-Authentication Data:
 PA-DATA type = 19
 PA-ETYPE-INFO2 etype = 17, salt = X.COMzkxx--xx.x.com, 
s2kparams = null
 PA-ETYPE-INFO2 etype = 23, salt = null, s2kparams = null

[2017-01-18 09:46:12,517] ERROR Unexpected exception, exiting abnormally 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
java.io.IOException: Could not configure server because SASL configuration did 
not allow the  ZooKeeper server to authenticate itself properly: 
javax.security.auth.login.LoginException: Pre-authentication information was 
invalid (24)
at 
org.apache.zookeeper.server.ServerCnxnFactory.configureSaslLogin(ServerCnxnFactory.java:207)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:82)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:130)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:111)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)




Thanks,
Shri
__
Shrikant Patel   |   PDX-NHIN
Enterprise Architecture Team
Asserting the Role of Pharmacy in Healthcare  
www.pdxinc.com
main 817.367.4302
101 Jim Wright Freeway South, Suite 200, Fort Worth, Texas 
76108-2202


P Please consider the environment before printing this email.

This e-mail and its contents (to include attachments) are the property of 
National Health Systems, Inc., its subsidiaries and affiliates, including but 
not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, 
and may contain confidential and proprietary or privileged information. If you 
are not the intended recipient of this e-mail, you are hereby notified that any 
unauthorized disclosure, copying, or distribution of this e-mail or of its 
attachments, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, please immediately notify the 
sender by telephone at 800-433-5719 or return e-mail and permanently delete the 
original e-mail.


[GitHub] kafka pull request #2151: Feature/authorizer name reference

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2151


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2350: MINOR: Finished exposing the broker config

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2350


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2017-01-23 Thread Andrew Olson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-1379:

Component/s: log

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-23 Thread Alexey Ozeritsky


13.01.2017, 22:29, "Dong Lin" :
> Hey Alexey,
>
> Thanks for your review and the alternative approach. Here is my
> understanding of your patch. kafka's background threads are used to move
> data between replicas. When data movement is triggered, the log will be
> rolled and the new logs will be put in the new directory, and background
> threads will move segment from old directory to new directory.
>
> It is important to note that KIP-112 is intended to work with KIP-113 to
> support JBOD. I think your solution is definitely simpler and better under
> the current kafka implementation that a broker will fail if any disk fails.
> But I am not sure if we want to allow broker to run with partial disks
> failure. Let's say the a replica is being moved from log_dir_old to
> log_dir_new and then log_dir_old stops working due to disk failure. How
> would your existing patch handles it? To make the scenario a bit more

We will lose log_dir_old. After broker restart we can read the data from 
log_dir_new.

> complicated, let's say the broker is shtudown, log_dir_old's disk fails,
> and the broker starts. In this case broker doesn't even know if log_dir_new
> has all the data needed for this replica. It becomes a problem if the
> broker is elected leader of this partition in this case.

log_dir_new contains the most recent data so we will lose the tail of partition.
This is not a big problem for us because we already delete tails by hand (see 
https://issues.apache.org/jira/browse/KAFKA-1712).
Also we dont use authomatic leader balancing 
(auto.leader.rebalance.enable=false), so this partition becomes the leader with 
a low probability.
I think my patch can be modified to prohibit the selection of the leader until 
the partition does not move completely.

>
> The solution presented in the KIP attempts to handle it by replacing
> replica in an atomic version fashion after the log in the new dir has fully
> caught up with the log in the old dir. At at time the log can be considered
> to exist on only one log directory.

As I understand your solution does not cover quotas.
What happens if someone starts to transfer 100 partitions ? 

> If yes, it will read a ByteBufferMessageSet from topicPartition.log and 
> append the message set to topicPartition.move

i.e. processPartitionData will read data from the beginning of 
topicPartition.log? What is the read size?
A ReplicaFetchThread reads many partitions so if one does some complicated work 
(= read a lot of data from disk) everything will slow down.
I think read size should not be very big. 

On the other hand at this point (processPartitionData) one can use only the new 
data (ByteBufferMessageSet  from parameters) and wait until 
(topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset && 
topicPartition.log.largestOffset == topicPartition.log.largestOffset).
In this case the write speed to topicPartition.move and topicPartition.log will 
be the same so this will allow us to move many partitions to one disk.

>
> And to answer your question, yes topicpartition.log refers to
> topic-paritition/segment.log.
>
> Thanks,
> Dong
>
> On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky 
> wrote:
>
>>  Hi,
>>
>>  We have the similar solution that have been working in production since
>>  2014. You can see it here: https://github.com/resetius/ka
>>  fka/commit/20658593e246d2184906879defa2e763c4d413fb
>>  The idea is very simple
>>  1. Disk balancer runs in a separate thread inside scheduler pool.
>>  2. It does not touch empty partitions
>>  3. Before it moves a partition it forcibly creates new segment on a
>>  destination disk
>>  4. It moves segment by segment from new to old.
>>  5. Log class works with segments on both disks
>>
>>  Your approach seems too complicated, moreover it means that you have to
>>  patch different components of the system
>>  Could you clarify what do you mean by topicPartition.log? Is it
>>  topic-paritition/segment.log ?
>>
>>  12.01.2017, 21:47, "Dong Lin" :
>>  > Hi all,
>>  >
>>  > We created KIP-113: Support replicas movement between log directories.
>>  > Please find the KIP wiki in the link
>>  > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  3A+Support+replicas+movement+between+log+directories
>>  > >  3A+Support+replicas+movement+between+log+directories>.*
>>  >
>>  > This KIP is related to KIP-112
>>  > >  3A+Handle+disk+failure+for+JBOD>:
>>  > Handle disk failure for JBOD. They are needed in order to support JBOD in
>>  > Kafka. Please help review the KIP. You feedback is appreciated!
>>  >
>>  > Thanks,
>>  > Dong


Re: [DISCUSS] KIP-111 : Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-01-23 Thread Rajini Sivaram
Hi Mayuresh,

The part about exposing Principal from custom PrincipalBuilder to custom
Authorizer sounds good. The definition of ACLs using kafka-acls.sh is less
clear to me. It feels like the goal is to expose custom Principal as an
opaque object between PrincipalBuilder and Authorizer so that Kafka doesn't
really need to know anything about additional stuff added for
customization. But kafka-acls.sh is expecting a key-value map from which
Principal is constructed. This is a breaking change to the PrincipalBuilder
interface - and I am not sure what it achieves.

1. A custom Principal is (a) created during authentication using custom
PrincipalBuilder (b) checked during authorization using Principal.equals()
and (c) stored in Zookeeper using Principal.toString(). Is that correct?
2. Is the reason for the new parameters in kafka-acls.sh and the breaking
change in PrincipalBuilder interface to enable users to specify a Principal
using properties rather than create the String in 1c) themselves?
3. Since the purpose of the new PrincipalBuilder method
buildPrincipal(Map principalConfigs) is to create a new Principal from command line
parameters, perhaps Properties or Map would be more
appropriate?

Regards,

Rajini


On Sat, Jan 21, 2017 at 5:50 PM, radai  wrote:

> LGTM.
>
> Kafka currently allows setting both a custom PrincipalBuilder and a custom
> Authorizer (expected to act on the output of the principal builder) but
> makes the naive assumption that any and all information about a (custom)
> principal is solely contained in its name property. this kip addresses
> that.
>
> On Fri, Jan 20, 2017 at 4:15 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi,
> >
> > Just wanted to see if anyone had any concerns with this KIP.
> > I would like to put this to vote soon, if there are no concerns.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Jan 12, 2017 at 11:21 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Ismael,
> > >
> > > Fair point. I will update it.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Thu, Jan 12, 2017 at 11:07 AM, Ismael Juma 
> wrote:
> > >
> > >> Hi Mayuresh,
> > >>
> > >> Thanks for the KIP. A quick comment before I do a more detailed
> > analysis,
> > >> the KIP says:
> > >>
> > >> `This KIP is a pure addition to existing functionality and does not
> > >> include
> > >> any backward incompatible changes.`
> > >>
> > >> However, the KIP is proposing the addition of a method to the
> > >> PrincipalBuilder pluggable interface, which is not a compatible
> change.
> > >> Existing implementations would no longer compile, for example. It
> would
> > be
> > >> good to make this clear in the KIP.
> > >>
> > >> Ismael
> > >>
> > >> On Thu, Jan 12, 2017 at 5:44 PM, Mayuresh Gharat <
> > >> gharatmayures...@gmail.com
> > >> > wrote:
> > >>
> > >> > Hi all.
> > >> >
> > >> > We created KIP-111 to propose that Kafka should preserve the
> Principal
> > >> > generated by the PrincipalBuilder while processing the request
> > received
> > >> on
> > >> > socket channel, on the broker.
> > >> >
> > >> > Please find the KIP wiki in the link
> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?
> > >> pageId=67638388.
> > >> > We would love to hear your comments and suggestions.
> > >> >
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Mayuresh
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>


Re: KafkaException: Message payload is null shutting down broker

2017-01-23 Thread Rodrigo Queiroz Saramago
Hi Ismael,

thank you very much. I create the issue:
https://issues.apache.org/jira/browse/KAFKA-4686.



2017-01-23 11:46 GMT-02:00 Ismael Juma :

> Hi Rodrigo,
>
> Please file a JIRA so that this can be investigated.
>
> Ismael
>
> On Mon, Jan 23, 2017 at 1:32 PM, Rodrigo Queiroz Saramago <
> rodrigo.saram...@zup.com.br> wrote:
>
> > Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in
> > which clients connect using two-way ssl authentication. I use kafka
> version
> > 0.10.1.1, the system works as expected for a while, but if the node goes
> > down and then is restarted, something got corrupted and is not possible
> > start broker again, it always fails with the same error. What this error
> > mean? What can I do in this case? Is this the expected behavior?
> >
> > [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads
> > during logs loading: kafka.common.KafkaException: Message payload is
> null:
> > Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload
> =
> > null) (kafka.log.LogManager)
> > [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup.
> > Prepare to shutdown (kafka.server.KafkaServer)
> > kafka.common.KafkaException: Message payload is null: Message(magic = 0,
> > attributes = 1, crc = 4122289508, key = null, payload = null)
> > at kafka.message.ByteBufferMessageSet$$anon$1.<
> > init>(ByteBufferMessageSet.scala:90)
> > at kafka.message.ByteBufferMessageSet$.deepIterator(
> > ByteBufferMessageSet.scala:85)
> > at kafka.message.MessageAndOffset.firstOffset(
> > MessageAndOffset.scala:33)
> > at kafka.log.LogSegment.recover(LogSegment.scala:223)
> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> > at scala.collection.TraversableLike$WithFilter$$
> > anonfun$foreach$1.apply(TraversableLike.scala:733)
> > at scala.collection.IndexedSeqOptimized$class.
> > foreach(IndexedSeqOptimized.scala:33)
> > at scala.collection.mutable.ArrayOps$ofRef.foreach(
> ArrayOps.scala:186)
> > at scala.collection.TraversableLike$WithFilter.
> > foreach(TraversableLike.scala:732)
> > at kafka.log.Log.loadSegments(Log.scala:179)
> > at kafka.log.Log.(Log.scala:108)
> > at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$
> > anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> > at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> > [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
> > [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2017-01-23 07:03:28,954] INFO EventThread shut down for session:
> > 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
> > [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed
> > (org.apache.zookeeper.ZooKeeper)
> > [2017-01-23 07:03:28,957] INFO shut down completed
> > (kafka.server.KafkaServer)
> > [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable
> > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > kafka.common.KafkaException: Message payload is null: Message(magic = 0,
> > attributes = 1, crc = 4122289508, key = null, payload = null)
> > at kafka.message.ByteBufferMessageSet$$anon$1.<
> > init>(ByteBufferMessageSet.scala:90)
> > at kafka.message.ByteBufferMessageSet$.deepIterator(
> > ByteBufferMessageSet.scala:85)
> > at kafka.message.MessageAndOffset.firstOffset(
> > MessageAndOffset.scala:33)
> > at kafka.log.LogSegment.recover(LogSegment.scala:223)
> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> > at scala.collection.TraversableLike$WithFilter$$
> > anonfun$foreach$1.apply(TraversableLike.scala:733)
> > at scala.collection.IndexedSeqOptimized$class.
> > foreach(IndexedSeqOptimized.scala:33)
> > at scala.collection.mutable.ArrayOps$ofRef.foreach(
> ArrayOps.scala:186)
> > at scala.collection.TraversableLike$WithFilter.
> > foreach(TraversableLike.scala:732)
> > at kafka.log.Log.loadSegments(Log.scala:179)
> > at kafka.log.Log.(Log.scala:108)
> > at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$
> > anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> > at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > at 

[jira] [Created] (KAFKA-4686) Null Message payload is shutting down broker

2017-01-23 Thread Rodrigo Queiroz Saramago (JIRA)
Rodrigo Queiroz Saramago created KAFKA-4686:
---

 Summary: Null Message payload is shutting down broker
 Key: KAFKA-4686
 URL: https://issues.apache.org/jira/browse/KAFKA-4686
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1
 Environment: Amazon Linux AMI release 2016.03 kernel 
4.4.19-29.55.amzn1.x86_64
Reporter: Rodrigo Queiroz Saramago
 Attachments: kafkaServer.out

Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in which 
clients connect using two-way ssl authentication. I use kafka version 0.10.1.1, 
the system works as expected for a while, but if the broker goes down and then 
is restarted, something got corrupted and is not possible start broker again, 
it always fails with the same error. What this error mean? What can I do in 
this case? Is this the expected behavior?

[2017-01-23 07:03:28,927] ERROR There was an error in one of the threads during 
logs loading: kafka.common.KafkaException: Message payload is null: 
Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = 
null) (kafka.log.LogManager)
[2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. Prepare 
to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
attributes = 1, crc = 4122289508, key = null, payload = null)
at 
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at 
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
at kafka.log.LogSegment.recover(LogSegment.scala:223)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.(Log.scala:108)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
[2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2017-01-23 07:03:28,954] INFO EventThread shut down for session: 
0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
[2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed 
(org.apache.zookeeper.ZooKeeper)
[2017-01-23 07:03:28,957] INFO shut down completed (kafka.server.KafkaServer)
[2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable 
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
attributes = 1, crc = 4122289508, key = null, payload = null)
at 
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at 
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
at kafka.log.LogSegment.recover(LogSegment.scala:223)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.(Log.scala:108)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 

Re: KafkaException: Message payload is null shutting down broker

2017-01-23 Thread Ismael Juma
Hi Rodrigo,

Please file a JIRA so that this can be investigated.

Ismael

On Mon, Jan 23, 2017 at 1:32 PM, Rodrigo Queiroz Saramago <
rodrigo.saram...@zup.com.br> wrote:

> Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in
> which clients connect using two-way ssl authentication. I use kafka version
> 0.10.1.1, the system works as expected for a while, but if the node goes
> down and then is restarted, something got corrupted and is not possible
> start broker again, it always fails with the same error. What this error
> mean? What can I do in this case? Is this the expected behavior?
>
> [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads
> during logs loading: kafka.common.KafkaException: Message payload is null:
> Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload =
> null) (kafka.log.LogManager)
> [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup.
> Prepare to shutdown (kafka.server.KafkaServer)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0,
> attributes = 1, crc = 4122289508, key = null, payload = null)
> at kafka.message.ByteBufferMessageSet$$anon$1.<
> init>(ByteBufferMessageSet.scala:90)
> at kafka.message.ByteBufferMessageSet$.deepIterator(
> ByteBufferMessageSet.scala:85)
> at kafka.message.MessageAndOffset.firstOffset(
> MessageAndOffset.scala:33)
> at kafka.log.LogSegment.recover(LogSegment.scala:223)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at scala.collection.TraversableLike$WithFilter$$
> anonfun$foreach$1.apply(TraversableLike.scala:733)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$WithFilter.
> foreach(TraversableLike.scala:732)
> at kafka.log.Log.loadSegments(Log.scala:179)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$
> anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2017-01-23 07:03:28,954] INFO EventThread shut down for session:
> 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
> [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed
> (org.apache.zookeeper.ZooKeeper)
> [2017-01-23 07:03:28,957] INFO shut down completed
> (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0,
> attributes = 1, crc = 4122289508, key = null, payload = null)
> at kafka.message.ByteBufferMessageSet$$anon$1.<
> init>(ByteBufferMessageSet.scala:90)
> at kafka.message.ByteBufferMessageSet$.deepIterator(
> ByteBufferMessageSet.scala:85)
> at kafka.message.MessageAndOffset.firstOffset(
> MessageAndOffset.scala:33)
> at kafka.log.LogSegment.recover(LogSegment.scala:223)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at scala.collection.TraversableLike$WithFilter$$
> anonfun$foreach$1.apply(TraversableLike.scala:733)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$WithFilter.
> foreach(TraversableLike.scala:732)
> at kafka.log.Log.loadSegments(Log.scala:179)
> at kafka.log.Log.(Log.scala:108)
> at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$
> anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,961] INFO shutting down (kafka.server.KafkaServer)
>
>
> Best regards

KafkaException: Message payload is null shutting down broker

2017-01-23 Thread Rodrigo Queiroz Saramago
Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in
which clients connect using two-way ssl authentication. I use kafka version
0.10.1.1, the system works as expected for a while, but if the node goes
down and then is restarted, something got corrupted and is not possible
start broker again, it always fails with the same error. What this error
mean? What can I do in this case? Is this the expected behavior?

[2017-01-23 07:03:28,927] ERROR There was an error in one of the threads
during logs loading: kafka.common.KafkaException: Message payload is null:
Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload =
null) (kafka.log.LogManager)
[2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup.
Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Message payload is null: Message(magic = 0,
attributes = 1, crc = 4122289508, key = null, payload = null)
at
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
at kafka.log.LogSegment.recover(LogSegment.scala:223)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.(Log.scala:108)
at
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
[2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2017-01-23 07:03:28,954] INFO EventThread shut down for session:
0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
[2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed
(org.apache.zookeeper.ZooKeeper)
[2017-01-23 07:03:28,957] INFO shut down completed
(kafka.server.KafkaServer)
[2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
kafka.common.KafkaException: Message payload is null: Message(magic = 0,
attributes = 1, crc = 4122289508, key = null, payload = null)
at
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
at kafka.log.LogSegment.recover(LogSegment.scala:223)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegments(Log.scala:179)
at kafka.log.Log.(Log.scala:108)
at
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-01-23 07:03:28,961] INFO shutting down (kafka.server.KafkaServer)


Best regards


[jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

2017-01-23 Thread postmas...@inn.ru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834463#comment-15834463
 ] 

postmas...@inn.ru commented on KAFKA-1207:
--

Delivery is delayed to these recipients or groups:

e...@inn.ru

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

This message hasn't been delivered yet. Delivery will continue to be attempted.

The server will keep trying to deliver this message for the next 1 days, 19 
hours and 51 minutes. You'll be notified if the message can't be delivered by 
that time.







Diagnostic information for administrators:

Generating server: lc-exch-04.inn.local
Receiving server: inn.ru (109.105.153.25)

e...@inn.ru
Server at inn.ru (109.105.153.25) returned '400 4.4.7 Message delayed'
1/23/2017 1:03:06 PM - Server at inn.ru (109.105.153.25) returned '441 4.4.1 
Error communicating with target host: "Failed to connect. Winsock error code: 
10060, Win32 error code: 10060." Last endpoint attempted was 109.105.153.25:25'

Original message headers:

Received: from lc-exch-04.inn.local (10.64.37.99) by lc-exch-04.inn.local
 (10.64.37.99) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32; Mon, 23
 Jan 2017 12:04:32 +0300
Received: from lc-asp-02.inn.ru (10.64.37.104) by lc-exch-04.inn.local
 (10.64.37.100) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32 via
 Frontend Transport; Mon, 23 Jan 2017 12:04:32 +0300
Received-SPF: None (no SPF record) identity=mailfrom; client-ip=209.188.14.142; 
helo=spamd4-us-west.apache.org; envelope-from=j...@apache.org; 
receiver=e...@inn.ru
X-Envelope-From: 
Received: from spamd4-us-west.apache.org (pnap-us-west-generic-nat.apache.org 
[209.188.14.142])
by lc-asp-02.inn.ru (Postfix) with ESMTP id 6BDF8400CE
for ; Mon, 23 Jan 2017 10:04:31 +0100 (CET)
Received: from localhost (localhost [127.0.0.1])
by spamd4-us-west.apache.org (ASF Mail Server at 
spamd4-us-west.apache.org) with ESMTP id 4794DC13E4
for ; Mon, 23 Jan 2017 09:04:31 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -1.999
X-Spam-Level:
X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31
tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999]
autolearn=disabled
Received: from mx1-lw-eu.apache.org ([10.40.0.8])
by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, 
port 10024)
with ESMTP id eL84Uo8RQH1A for ;
Mon, 23 Jan 2017 09:04:29 + (UTC)
Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org 
[209.188.14.139])
by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with 
ESMTP id 9A1995F610
for ; Mon, 23 Jan 2017 09:04:28 + (UTC)
Received: from jira-lw-us.apache.org (unknown [207.244.88.139])
by mailrelay1-us-west.apache.org (ASF Mail Server at 
mailrelay1-us-west.apache.org) with ESMTP id 348D8E036E
for ; Mon, 23 Jan 2017 09:04:27 + (UTC)
Received: from jira-lw-us.apache.org (localhost [127.0.0.1])
by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) 
with ESMTP id 8D31C2528D
for ; Mon, 23 Jan 2017 09:04:26 + (UTC)
Date: Mon, 23 Jan 2017 09:04:26 +
From: "postmas...@inn.ru (JIRA)" 
To: 
Message-ID: 
In-Reply-To: 
References:  

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache
 Mesos
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
X-inn-MailScanner-ESVA-Information: Please contact  for more information
X-inn-MailScanner-ESVA-ID: 6BDF8400CE.A8351
X-inn-MailScanner-ESVA: Found to be clean
X-inn-MailScanner-ESVA-From: j...@apache.org
X-inn-MailScanner-ESVA-Watermark: 1485767072.33179@jjNsTKI2w8SfIqsbFnG+2w
Return-Path: j...@apache.org
X-OrganizationHeadersPreserved: lc-exch-04.inn.local
X-CrossPremisesHeadersFilteredByDsnGenerator: lc-exch-04.inn.local



> Launch Kafka from within Apache Mesos
> -
>
> Key: KAFKA-1207
> URL: https://issues.apache.org/jira/browse/KAFKA-1207
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>  Labels: mesos
> Attachments: KAFKA-1207_2014-01-19_00:04:58.patch, 
> KAFKA-1207_2014-01-19_00:48:49.patch, KAFKA-1207.patch
>
>
> There are a few components to 

[jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

2017-01-23 Thread postmas...@inn.ru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834464#comment-15834464
 ] 

postmas...@inn.ru commented on KAFKA-1207:
--

Delivery is delayed to these recipients or groups:

e...@inn.ru

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

This message hasn't been delivered yet. Delivery will continue to be attempted.

The server will keep trying to deliver this message for the next 1 days, 19 
hours and 58 minutes. You'll be notified if the message can't be delivered by 
that time.







Diagnostic information for administrators:

Generating server: lc-exch-04.inn.local
Receiving server: inn.ru (109.105.153.25)

e...@inn.ru
Server at inn.ru (109.105.153.25) returned '400 4.4.7 Message delayed'
1/23/2017 1:03:06 PM - Server at inn.ru (109.105.153.25) returned '441 4.4.1 
Error communicating with target host: "Failed to connect. Winsock error code: 
10060, Win32 error code: 10060." Last endpoint attempted was 109.105.153.25:25'

Original message headers:

Received: from lc-exch-04.inn.local (10.64.37.99) by lc-exch-04.inn.local
 (10.64.37.99) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32; Mon, 23
 Jan 2017 12:11:33 +0300
Received: from lc-asp-02.inn.ru (10.64.37.104) by lc-exch-04.inn.local
 (10.64.37.100) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32 via
 Frontend Transport; Mon, 23 Jan 2017 12:11:33 +0300
Received-SPF: None (no SPF record) identity=mailfrom; client-ip=209.188.14.142; 
helo=spamd1-us-west.apache.org; envelope-from=j...@apache.org; 
receiver=e...@inn.ru
X-Envelope-From: 
Received: from spamd1-us-west.apache.org (pnap-us-west-generic-nat.apache.org 
[209.188.14.142])
by lc-asp-02.inn.ru (Postfix) with ESMTP id DACFE400CB
for ; Mon, 23 Jan 2017 10:11:31 +0100 (CET)
Received: from localhost (localhost [127.0.0.1])
by spamd1-us-west.apache.org (ASF Mail Server at 
spamd1-us-west.apache.org) with ESMTP id 0F8AFC0B8A
for ; Mon, 23 Jan 2017 09:11:32 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -1.999
X-Spam-Level:
X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31
tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999]
autolearn=disabled
Received: from mx1-lw-eu.apache.org ([10.40.0.8])
by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 
10024)
with ESMTP id RJpoHbOVNAop for ;
Mon, 23 Jan 2017 09:11:30 + (UTC)
Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org 
[209.188.14.139])
by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with 
ESMTP id 1AA635FB5D
for ; Mon, 23 Jan 2017 09:11:30 + (UTC)
Received: from jira-lw-us.apache.org (unknown [207.244.88.139])
by mailrelay1-us-west.apache.org (ASF Mail Server at 
mailrelay1-us-west.apache.org) with ESMTP id 95A9DE03A4
for ; Mon, 23 Jan 2017 09:11:27 + (UTC)
Received: from jira-lw-us.apache.org (localhost [127.0.0.1])
by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) 
with ESMTP id F330125291
for ; Mon, 23 Jan 2017 09:11:26 + (UTC)
Date: Mon, 23 Jan 2017 09:11:26 +
From: "postmas...@inn.ru (JIRA)" 
To: 
Message-ID: 
In-Reply-To: 
References:  

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache
 Mesos
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
X-inn-MailScanner-ESVA-Information: Please contact  for more information
X-inn-MailScanner-ESVA-ID: DACFE400CB.AB3C6
X-inn-MailScanner-ESVA: Found to be clean
X-inn-MailScanner-ESVA-From: j...@apache.org
X-inn-MailScanner-ESVA-Watermark: 1485767492.79952@K3ZQPaJBKCjWBAScddP83Q
Return-Path: j...@apache.org
X-OrganizationHeadersPreserved: lc-exch-04.inn.local
X-CrossPremisesHeadersFilteredByDsnGenerator: lc-exch-04.inn.local



> Launch Kafka from within Apache Mesos
> -
>
> Key: KAFKA-1207
> URL: https://issues.apache.org/jira/browse/KAFKA-1207
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>  Labels: mesos
> Attachments: KAFKA-1207_2014-01-19_00:04:58.patch, 
> KAFKA-1207_2014-01-19_00:48:49.patch, KAFKA-1207.patch
>
>
> There are a few components to 

[jira] [Commented] (KAFKA-4679) Remove unstable markers from Connect APIs

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834456#comment-15834456
 ] 

ASF GitHub Bot commented on KAFKA-4679:
---

GitHub user baluchicken opened a pull request:

https://github.com/apache/kafka/pull/2423

KAFKA-4679 Remove unstable markers from Connect APIs

@ewencp ignore this PR if you are already started to work on this ticket.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baluchicken/kafka-1 KAFKA-4679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2423.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2423


commit 9e680730a5bf14a50f075c7d1e71cd9e418e8a27
Author: Balint Molnar 
Date:   2017-01-23T13:09:04Z

KAFKA-4679 Remove unstable markers from Connect APIs




> Remove unstable markers from Connect APIs
> -
>
> Key: KAFKA-4679
> URL: https://issues.apache.org/jira/browse/KAFKA-4679
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.2.0
>
>
> Connect has had a stable API for awhile now and we are careful about 
> compatibility. It's safe to remove the unstable markers now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2423: KAFKA-4679 Remove unstable markers from Connect AP...

2017-01-23 Thread baluchicken
GitHub user baluchicken opened a pull request:

https://github.com/apache/kafka/pull/2423

KAFKA-4679 Remove unstable markers from Connect APIs

@ewencp ignore this PR if you are already started to work on this ticket.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baluchicken/kafka-1 KAFKA-4679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2423.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2423


commit 9e680730a5bf14a50f075c7d1e71cd9e418e8a27
Author: Balint Molnar 
Date:   2017-01-23T13:09:04Z

KAFKA-4679 Remove unstable markers from Connect APIs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4679) Remove unstable markers from Connect APIs

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834440#comment-15834440
 ] 

ASF GitHub Bot commented on KAFKA-4679:
---

Github user baluchicken closed the pull request at:

https://github.com/apache/kafka/pull/2422


> Remove unstable markers from Connect APIs
> -
>
> Key: KAFKA-4679
> URL: https://issues.apache.org/jira/browse/KAFKA-4679
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.2.0
>
>
> Connect has had a stable API for awhile now and we are careful about 
> compatibility. It's safe to remove the unstable markers now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2422: KAFKA-4679 Remove unstable markers from Connect AP...

2017-01-23 Thread baluchicken
Github user baluchicken closed the pull request at:

https://github.com/apache/kafka/pull/2422


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4679) Remove unstable markers from Connect APIs

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834437#comment-15834437
 ] 

ASF GitHub Bot commented on KAFKA-4679:
---

GitHub user baluchicken opened a pull request:

https://github.com/apache/kafka/pull/2422

KAFKA-4679 Remove unstable markers from Connect APIs

@ewencp ignore this PR if you are already started to work on this ticket.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baluchicken/kafka-1 KAFKA-4679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2422


commit 42f5fc080ace8341ff7d405c089ceebd26048c6b
Author: Balint Molnar 
Date:   2017-01-23T12:59:25Z

KAFKA-4679 Remove unstable markers from Connect APIs




> Remove unstable markers from Connect APIs
> -
>
> Key: KAFKA-4679
> URL: https://issues.apache.org/jira/browse/KAFKA-4679
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.2.0
>
>
> Connect has had a stable API for awhile now and we are careful about 
> compatibility. It's safe to remove the unstable markers now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2422: KAFKA-4679 Remove unstable markers from Connect AP...

2017-01-23 Thread baluchicken
GitHub user baluchicken opened a pull request:

https://github.com/apache/kafka/pull/2422

KAFKA-4679 Remove unstable markers from Connect APIs

@ewencp ignore this PR if you are already started to work on this ticket.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baluchicken/kafka-1 KAFKA-4679

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2422


commit 42f5fc080ace8341ff7d405c089ceebd26048c6b
Author: Balint Molnar 
Date:   2017-01-23T12:59:25Z

KAFKA-4679 Remove unstable markers from Connect APIs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2421: HOTFIX: Fix cmd line argument order in streams sys...

2017-01-23 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2421

HOTFIX: Fix cmd line argument order in streams system tests

Fix the ordering of cmd line arguments passed to the system tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka system-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2421


commit 399a99074fe623f5dda1c9fdc603f2cccfcac6bb
Author: Damian Guy 
Date:   2017-01-23T11:46:38Z

add arg to system test

commit 75165327a1df096047a0d97abd75a6f303fa1e2c
Author: Damian Guy 
Date:   2017-01-23T12:08:59Z

blah




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4679) Remove unstable markers from Connect APIs

2017-01-23 Thread Balint Molnar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834402#comment-15834402
 ] 

Balint Molnar commented on KAFKA-4679:
--

[~ewencp] can I help with this?

> Remove unstable markers from Connect APIs
> -
>
> Key: KAFKA-4679
> URL: https://issues.apache.org/jira/browse/KAFKA-4679
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.10.2.0
>
>
> Connect has had a stable API for awhile now and we are careful about 
> compatibility. It's safe to remove the unstable markers now.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-108: Create Topic Policy

2017-01-23 Thread Ismael Juma
Hi all,

During PR review a few minor improvements to the KIP were suggested:

1. Rename TopicDetails to RequestMetadata and make it an inner class of the
CreateTopicPolicy interface. By using a more generic name, we have more
flexibility with regards to additional data we may want to pass to the
policy in the future without it looking out of place (e.g. session
information).

2. Introduce PolicyViolationException with its own error code instead of
reusing InvalidRequestException because the latter's documentation is a bit
too specific (it mentions malformed clients requests, for example).

3. Fixed config name to be `create.topic.policy.class.name` to match the
name of the interface and the fact that the policy works with a single
topic (it was previously `create.topics.policy.class.name`).

I updated the KIP page to include these changes:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy

Hopefully these are uncontroversial, but please let me know if you disagree.

Ismael

On Wed, Jan 11, 2017 at 11:18 PM, Ismael Juma  wrote:

> Thanks to everyone who voted and provided feedback.
>
> The vote has passed with 4 binding +1s (Joel, Gwen, Neha, Sriram) and 4
> non-binding +1s (Edoardo, Roger, Jon, Apurva).
>
> I have updated the relevant wiki pages.
>
> Ismael
>
> On Tue, Jan 10, 2017 at 1:00 AM, Sriram Subramanian 
> wrote:
>
>> +1
>>
>> On Mon, Jan 9, 2017 at 3:29 PM, Apurva Mehta  wrote:
>>
>> > (hit send too soon)
>> >
>> > +1 (non-binding).. that is a very well written KIP!
>> >
>> > On Mon, Jan 9, 2017 at 3:28 PM, Apurva Mehta 
>> wrote:
>> >
>> > > +1, that1
>> > >
>> > > On Mon, Jan 9, 2017 at 2:47 PM, Neha Narkhede 
>> wrote:
>> > >
>> > >> +1 - thanks Ismael!
>> > >>
>> > >> On Mon, Jan 9, 2017 at 2:30 PM Gwen Shapira 
>> wrote:
>> > >>
>> > >> > +1 - thanks for the proposal, I think it will be super useful for
>> > >> admins.
>> > >> >
>> > >> > On Sun, Jan 8, 2017 at 6:50 AM, Ismael Juma 
>> > wrote:
>> > >> > > Hi all,
>> > >> > >
>> > >> > > As the discussion seems to have settled down, I would like to
>> > initiate
>> > >> > the
>> > >> > > voting process for KIP-108: Create Topic Policy:
>> > >> > >
>> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-108
>> > >> > > %3A+Create+Topic+Policy
>> > >> > >
>> > >> > > The vote will run for a minimum of 72 hours.
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Ismael
>> > >> >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > Gwen Shapira
>> > >> > Product Manager | Confluent
>> > >> > 650.450.2760 <(650)%20450-2760> | @gwenshap
>> > >> > Follow us: Twitter | blog
>> > >> >
>> > >> --
>> > >> Thanks,
>> > >> Neha
>> > >>
>> > >
>> > >
>> >
>>
>
>


[jira] [Commented] (KAFKA-4677) Avoid unnecessary task movement across threads of the same process during rebalance

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834239#comment-15834239
 ] 

ASF GitHub Bot commented on KAFKA-4677:
---

Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/2411


> Avoid unnecessary task movement across threads of the same process during 
> rebalance
> ---
>
> Key: KAFKA-4677
> URL: https://issues.apache.org/jira/browse/KAFKA-4677
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 0.10.3.0
>
>
> StreamPartitionAssigner tries to follow a sticky assignment policy to avoid 
> expensive task migration. Currently, it does this in a best-effort approach.
> We could observe a case, for which tasks did migrate for no good reason, thus 
> we assume that the current implementation could be improved to be more sticky.
> The concrete scenario is as follows:
> assume we have topology with 3 tasks, A, B, C
> assume we have 3 threads, each executing one task: 1-A, 2-B, 3-C
> for some reason, thread 1 goes down and a rebalance gets triggered
> thread 2 and 3 get their partitions revoked
> sometimes (not sure what the exact condition for this is), the new assignment 
> flips the assignment for task B and C (task A is newly assigned to either 
> thread 2 or 3)
> > possible new assignment 2(A,C) and 3-B
> There is no obvious reason (like load-balancing) why the task assignment for 
> B and C does change to the other thread resulting in unnecessary task 
> migration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2411: [WIP] KAFKA-4677: Avoid unnecessary task movement ...

2017-01-23 Thread dguy
Github user dguy closed the pull request at:

https://github.com/apache/kafka/pull/2411


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Request to provide rights to assign JIRA tickets to myself

2017-01-23 Thread Akhilesh Naidu
Hi,


I would be interested to contribute to the Kafka project,

and was presently looking into the below issue

KAFKA-4566
Can't Symlink to Kafka bins


Could some one please provide the rights to assign JIRA tickets to myself.



Regards

Akhilesh




DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



[jira] [Commented] (KAFKA-4685) All partitions offline, no conroller znode in ZK

2017-01-23 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834209#comment-15834209
 ] 

Sinóros-Szabó Péter commented on KAFKA-4685:


Yes. I was able to check ZK only 1 hour after the issue, at about 8:00. At that 
time and still now:
{code}
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[]
[zk: localhost:2181(CONNECTED) 1] ls /controller
[]
[zk: localhost:2181(CONNECTED) 2]
{code}

I have checked the zookeeper logs and those show some error that I do not 
understand (around 6:51), so it may happen that this is a ZK issue and Kafka 
node managed to register themself successfully, but ZK forgot some if the data.

> All partitions offline, no conroller znode in ZK
> 
>
> Key: KAFKA-4685
> URL: https://issues.apache.org/jira/browse/KAFKA-4685
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sinóros-Szabó Péter
> Attachments: kafka-0-logs.zip, kafka-1-logs.zip, kafka-2-logs.zip, 
> zookeeper-logs.zip
>
>
> Setup: 3 Kafka 0.11.1.1 nodes on kubernetes (in AWS), and another 3 nodes of 
> Zookeeper 3.5.2-alpha also in kubernetes (in AWS).
> At 2017-01-23 06:51 ZK sessions expired. It seems from the logs that kafka-2 
> was elected as the new controller, but I am not sure how to read that logs.
> I've checked the ZK data and both the /controller is empty and also the 
> /brokers/ids is empty. Kafka reports that all partitions are offline, 
> although it seems to be working because messages are coming and going.
> We are using an alpha version, I know that it may be a problem, but I suppose 
> that Kafka should see that there is not any node registered as controller.
> I have attached the Kafka and ZK logs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2017-01-23 Thread Michal Turek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834170#comment-15834170
 ] 

Michal Turek commented on KAFKA-3450:
-

Hi Esoga, I don't think this issue is duplication of KAFKA-3539, maybe only 
partially. If the producer writes to a non-existing topic and the auto-creation 
is disabled, it will be still able to internally communicate with brokers. The 
issue is that the brokers returns "this topic doesn't exist" instead of "this 
topic doesn't exist *and it won't be automatically created*", so the client is 
retrying its request many times until max.block.ms timeout will occur. It's 
missing logic in the producer/brokers, maybe the protocol doesn't support this 
at all.

{noformat}
2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 0 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 1 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 2 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 3 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 4 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 5 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 6 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.534 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 7 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.635 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 8 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
2016-03-23 12:44:38.736 WARN  o.a.kafka.clients.NetworkClient 
[kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
metadata with correlation id 9 : 
{ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
{noformat}

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> 

[jira] [Commented] (KAFKA-4685) All partitions offline, no conroller znode in ZK

2017-01-23 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834169#comment-15834169
 ] 

huxi commented on KAFKA-4685:
-

Server logs showed that all the brokers have come back to register in ZK again. 
See "Creating /brokers/ids/0,1,2" in the logs.  So did you mean /controller and 
/brokers/ids are still empty even after the broker 2 was elected as the new 
controller? 

> All partitions offline, no conroller znode in ZK
> 
>
> Key: KAFKA-4685
> URL: https://issues.apache.org/jira/browse/KAFKA-4685
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sinóros-Szabó Péter
> Attachments: kafka-0-logs.zip, kafka-1-logs.zip, kafka-2-logs.zip, 
> zookeeper-logs.zip
>
>
> Setup: 3 Kafka 0.11.1.1 nodes on kubernetes (in AWS), and another 3 nodes of 
> Zookeeper 3.5.2-alpha also in kubernetes (in AWS).
> At 2017-01-23 06:51 ZK sessions expired. It seems from the logs that kafka-2 
> was elected as the new controller, but I am not sure how to read that logs.
> I've checked the ZK data and both the /controller is empty and also the 
> /brokers/ids is empty. Kafka reports that all partitions are offline, 
> although it seems to be working because messages are coming and going.
> We are using an alpha version, I know that it may be a problem, but I suppose 
> that Kafka should see that there is not any node registered as controller.
> I have attached the Kafka and ZK logs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2363) ProducerSendTest.testCloseWithZeroTimeoutFromCallerThread Transient Failure

2017-01-23 Thread Ben Stopford (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Stopford reassigned KAFKA-2363:
---

Assignee: (was: Ben Stopford)

> ProducerSendTest.testCloseWithZeroTimeoutFromCallerThread Transient Failure
> ---
>
> Key: KAFKA-2363
> URL: https://issues.apache.org/jira/browse/KAFKA-2363
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Fangmin Lv
>  Labels: newbie, transient-unit-test-failure
> Fix For: 0.10.2.0
>
>
> {code}
> kafka.api.ProducerSendTest > testCloseWithZeroTimeoutFromCallerThread 
> STANDARD_OUT
> [2015-07-24 23:13:05,148] WARN fsync-ing the write ahead log in SyncThread:0 
> took 1084ms which will adversely effect operation latency. See the ZooKeeper 
> troubleshooting guide (org.apache.zookeeper.s
> erver.persistence.FileTxnLog:334)
> kafka.api.ProducerSendTest > testCloseWithZeroTimeoutFromCallerThread FAILED
> java.lang.AssertionError: No request is complete.
> at org.junit.Assert.fail(Assert.java:92)
> at org.junit.Assert.assertTrue(Assert.java:44)
> at 
> kafka.api.ProducerSendTest$$anonfun$testCloseWithZeroTimeoutFromCallerThread$1.apply$mcVI$sp(ProducerSendTest.scala:343)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at 
> kafka.api.ProducerSendTest.testCloseWithZeroTimeoutFromCallerThread(ProducerSendTest.scala:340)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-23 Thread Damian Guy
I'm not a fan of materialize. I think it interrupts the flow, i.e,

table.mapValue(..).materialize().join(..).materialize()
compared to:
table.mapValues(..).join(..)

I know which one i prefer.
My preference is stil to provide overloaded methods where people can
specify the store names if they want, otherwise we just generate them.

On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax  wrote:

> Hi,
>
> thanks for the KIP Eno! Here are my 2 cents:
>
> 1) I like Guozhang's proposal about removing store name from all KTable
> methods and generate internal names (however, I would do this as
> overloads). Furthermore, I would not force users to call .materialize()
> if they want to query a store, but add one more method .stateStoreName()
> that returns the store name if the KTable is materialized. Thus, also
> .materialize() must not necessarily have a parameter storeName (ie, we
> should have some overloads here).
>
> I would also not allow to provide a null store name (to indicate no
> materialization if not necessary) but throw an exception.
>
> This yields some simplification (see below).
>
>
> 2) I also like Guozhang's proposal about KStream#toTable()
>
>
> 3)
> >
> >>   3. What will happen when you call materialize on KTable that is
> already
> >>   materialized? Will it create another StateStore (providing the name is
> >>   different), throw an Exception?
> >
> > Currently an exception is thrown, but see below.
> >
> >
>
> If we follow approach (1) from Guozhang, there is no need to worry about
> a second materialization and also no exception must be throws. A call to
> .materialize() basically sets a "materialized flag" (ie, idempotent
> operation) and sets a new name.
>
>
> 4)
> >> Rename toStream() to toKStream() for consistency.
> >
> > Not sure whether that is really required. We also use
> > `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, and
> > don't care about the "K" prefix.
>
> Eno's reply:
> > I think changing it to `toKStream` would make it absolutely clear what
> we are converting it to.
> >
> > I'd say we should probably change the KStreamBuilder methods (but not in
> this KIP).
>
> I would keep #toStream(). (see below)
>
>
> 5) We should not remove any methods but only deprecate them.
>
>
>
> A general note:
>
> I do not understand your comments "Rejected Alternatives". You say "Have
> the KTable be the materialized view" was rejected. But your KIP actually
> does exactly this -- the changelog abstraction of KTable is secondary
> after those changes and the "view" abstraction is what a KTable is. And
> just to be clear, I like this a lot:
>
>  - it aligns with the name KTable
>  - is aligns with stream-table-duality
>  - it aligns with IQ
>
> I would say that a KTable is a "view abstraction" (as materialization is
> optional).
>
>
>
> -Matthias
>
>
>
>
> On 1/22/17 5:05 PM, Guozhang Wang wrote:
> > Thanks for the KIP Eno, I have a few meta comments and a few detailed
> > comments:
> >
> > 1. I like the materialize() function in general, but I would like to see
> > how other KTable functions should be updated accordingly. For example, 1)
> > KStreamBuilder.table(..) has a state store name parameter, and we will
> > always materialize the KTable unless its state store name is set to null;
> > 2) KTable.agg requires the result KTable to be materialized, and hence it
> > also have a state store name; 3) KTable.join requires the joining table
> to
> > be materialized. And today we do not actually have a mechanism to enforce
> > that, but will only throw an exception at runtime if it is not (e.g. if
> you
> > have "builder.table("topic", null).join()" a RTE will be thrown).
> >
> > I'd make an extended proposal just to kick off the discussion here: let's
> > remove all the state store params in other KTable functions, and if in
> some
> > cases KTable have to be materialized (e.g. KTable resulted from KXX.agg)
> > and users do not call materialize(), then we treat it as "users are not
> > interested in querying it at all" and hence use an internal name
> generated
> > for the materialized KTable; i.e. although it is materialized the state
> > store is not exposed to users. And if users call materialize() afterwards
> > but we have already decided to materialize it, we can replace the
> internal
> > name with the user's provided names. Then from a user's point-view, if
> they
> > ever want to query a KTable, they have to call materialize() with a given
> > state store name. This approach has one awkwardness though, that serdes
> and
> > state store names param are not separated and could be overlapped (see
> > detailed comment #2 below).
> >
> >
> > 2. This step does not need to be included in this KIP, but just as a
> > reference / future work: as we have discussed before, we may enforce
> > materialize KTable.join resulted KTables as well in the future. If we do
> > that, then:
> >
> > a) KXX.agg resulted KTables are always materialized;
> > b) KTable.agg 

[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-01-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834117#comment-15834117
 ] 

Ismael Juma commented on KAFKA-4669:


The reason why there's no try/catch in `handleCompletedReceives` is that this 
error is not meant to happen. Clearly it is happening though, so we should 
think how to handle that in a way that works for all users of `NetworkClient`. 
[~junrao], [~hachikuji], any thoughts?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4685) All partitions offline, no conroller znode in ZK

2017-01-23 Thread JIRA
Sinóros-Szabó Péter created KAFKA-4685:
--

 Summary: All partitions offline, no conroller znode in ZK
 Key: KAFKA-4685
 URL: https://issues.apache.org/jira/browse/KAFKA-4685
 Project: Kafka
  Issue Type: Bug
Reporter: Sinóros-Szabó Péter
 Attachments: kafka-0-logs.zip, kafka-1-logs.zip, kafka-2-logs.zip, 
zookeeper-logs.zip

Setup: 3 Kafka 0.11.1.1 nodes on kubernetes (in AWS), and another 3 nodes of 
Zookeeper 3.5.2-alpha also in kubernetes (in AWS).

At 2017-01-23 06:51 ZK sessions expired. It seems from the logs that kafka-2 
was elected as the new controller, but I am not sure how to read that logs.

I've checked the ZK data and both the /controller is empty and also the 
/brokers/ids is empty. Kafka reports that all partitions are offline, although 
it seems to be working because messages are coming and going.

We are using an alpha version, I know that it may be a problem, but I suppose 
that Kafka should see that there is not any node registered as controller.

I have attached the Kafka and ZK logs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

2017-01-23 Thread postmas...@inn.ru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834096#comment-15834096
 ] 

postmas...@inn.ru commented on KAFKA-1207:
--

Delivery is delayed to these recipients or groups:

e...@inn.ru

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

This message hasn't been delivered yet. Delivery will continue to be attempted.

The server will keep trying to deliver this message for the next 1 days, 19 
hours and 51 minutes. You'll be notified if the message can't be delivered by 
that time.







Diagnostic information for administrators:

Generating server: lc-exch-02.inn.local
Receiving server: inn.ru (109.105.153.25)

e...@inn.ru
Server at inn.ru (109.105.153.25) returned '400 4.4.7 Message delayed'
1/23/2017 8:59:53 AM - Server at inn.ru (109.105.153.25) returned '441 4.4.1 
Error communicating with target host: "Failed to connect. Winsock error code: 
10060, Win32 error code: 10060." Last endpoint attempted was 109.105.153.25:25'

Original message headers:

Received: from lc-exch-04.inn.local (10.64.37.99) by lc-exch-02.inn.local
 (10.64.37.98) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32; Mon, 23
 Jan 2017 08:01:38 +0300
Received: from lc-asp-02.inn.ru (10.64.37.104) by lc-exch-04.inn.local
 (10.64.37.100) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32 via
 Frontend Transport; Mon, 23 Jan 2017 08:01:38 +0300
Received-SPF: None (no SPF record) identity=mailfrom; client-ip=209.188.14.142; 
helo=spamd1-us-west.apache.org; envelope-from=j...@apache.org; 
receiver=e...@inn.ru
X-Envelope-From: 
Received: from spamd1-us-west.apache.org (pnap-us-west-generic-nat.apache.org 
[209.188.14.142])
by lc-asp-02.inn.ru (Postfix) with ESMTP id E1855400C6
for ; Mon, 23 Jan 2017 06:01:36 +0100 (CET)
Received: from localhost (localhost [127.0.0.1])
by spamd1-us-west.apache.org (ASF Mail Server at 
spamd1-us-west.apache.org) with ESMTP id 21119C147B
for ; Mon, 23 Jan 2017 05:01:37 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -1.999
X-Spam-Level:
X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31
tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999]
autolearn=disabled
Received: from mx1-lw-us.apache.org ([10.40.0.8])
by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 
10024)
with ESMTP id rKyYzJGGcwq2 for ;
Mon, 23 Jan 2017 05:01:32 + (UTC)
Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org 
[209.188.14.139])
by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with 
ESMTP id E117560D97
for ; Mon, 23 Jan 2017 05:01:30 + (UTC)
Received: from jira-lw-us.apache.org (unknown [207.244.88.139])
by mailrelay1-us-west.apache.org (ASF Mail Server at 
mailrelay1-us-west.apache.org) with ESMTP id 964FEE0410
for ; Mon, 23 Jan 2017 05:01:28 + (UTC)
Received: from jira-lw-us.apache.org (localhost [127.0.0.1])
by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) 
with ESMTP id 49465252A0
for ; Mon, 23 Jan 2017 05:01:27 + (UTC)
Date: Mon, 23 Jan 2017 05:01:27 +
From: "postmas...@inn.ru (JIRA)" 
To: 
Message-ID: 
In-Reply-To: 
References:  

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache
 Mesos
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
X-inn-MailScanner-ESVA-Information: Please contact  for more information
X-inn-MailScanner-ESVA-ID: E1855400C6.A81A3
X-inn-MailScanner-ESVA: Found to be clean
X-inn-MailScanner-ESVA-From: j...@apache.org
X-inn-MailScanner-ESVA-Watermark: 1485752497.28222@FIRlNu+5btq8w/4pTJuQAw
Return-Path: j...@apache.org
X-OrganizationHeadersPreserved: lc-exch-02.inn.local
X-CrossPremisesHeadersFilteredByDsnGenerator: lc-exch-02.inn.local



> Launch Kafka from within Apache Mesos
> -
>
> Key: KAFKA-1207
> URL: https://issues.apache.org/jira/browse/KAFKA-1207
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>  Labels: mesos
> Attachments: KAFKA-1207_2014-01-19_00:04:58.patch, 
> KAFKA-1207_2014-01-19_00:48:49.patch, KAFKA-1207.patch
>
>
> There are a few components to 

[jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

2017-01-23 Thread postmas...@inn.ru (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834090#comment-15834090
 ] 

postmas...@inn.ru commented on KAFKA-1207:
--

Delivery is delayed to these recipients or groups:

e...@inn.ru

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache Mesos

This message hasn't been delivered yet. Delivery will continue to be attempted.

The server will keep trying to deliver this message for the next 1 days, 19 
hours and 58 minutes. You'll be notified if the message can't be delivered by 
that time.







Diagnostic information for administrators:

Generating server: lc-exch-04.inn.local
Receiving server: inn.ru (109.105.153.25)

e...@inn.ru
Server at inn.ru (109.105.153.25) returned '400 4.4.7 Message delayed'
1/23/2017 8:53:04 AM - Server at inn.ru (109.105.153.25) returned '441 4.4.1 
Error communicating with target host: "Failed to connect. Winsock error code: 
10060, Win32 error code: 10060." Last endpoint attempted was 109.105.153.25:25'

Original message headers:

Received: from lc-exch-04.inn.local (10.64.37.99) by lc-exch-04.inn.local
 (10.64.37.99) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32; Mon, 23
 Jan 2017 08:01:33 +0300
Received: from lc-asp-02.inn.ru (10.64.37.105) by lc-exch-04.inn.local
 (10.64.37.100) with Microsoft SMTP Server (version=TLS1_2,
 cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.669.32 via
 Frontend Transport; Mon, 23 Jan 2017 08:01:33 +0300
Received-SPF: None (no SPF record) identity=mailfrom; client-ip=209.188.14.142; 
helo=spamd3-us-west.apache.org; envelope-from=j...@apache.org; 
receiver=e...@inn.ru
X-Envelope-From: 
Received: from spamd3-us-west.apache.org (pnap-us-west-generic-nat.apache.org 
[209.188.14.142])
by lc-asp-02.inn.ru (Postfix) with ESMTP id 7DB1C400C3
for ; Mon, 23 Jan 2017 06:01:31 +0100 (CET)
Received: from localhost (localhost [127.0.0.1])
by spamd3-us-west.apache.org (ASF Mail Server at 
spamd3-us-west.apache.org) with ESMTP id 539DA18C4AD
for ; Mon, 23 Jan 2017 05:01:31 + (UTC)
X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org
X-Spam-Flag: NO
X-Spam-Score: -1.999
X-Spam-Level:
X-Spam-Status: No, score=-1.999 tagged_above=-999 required=6.31
tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999]
autolearn=disabled
Received: from mx1-lw-us.apache.org ([10.40.0.8])
by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, 
port 10024)
with ESMTP id dODTtcn5XJWa for ;
Mon, 23 Jan 2017 05:01:29 + (UTC)
Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org 
[209.188.14.139])
by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with 
ESMTP id 832355FC06
for ; Mon, 23 Jan 2017 05:01:29 + (UTC)
Received: from jira-lw-us.apache.org (unknown [207.244.88.139])
by mailrelay1-us-west.apache.org (ASF Mail Server at 
mailrelay1-us-west.apache.org) with ESMTP id CE1AAE036B
for ; Mon, 23 Jan 2017 05:01:27 + (UTC)
Received: from jira-lw-us.apache.org (localhost [127.0.0.1])
by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) 
with ESMTP id 003102528D
for ; Mon, 23 Jan 2017 05:01:27 + (UTC)
Date: Mon, 23 Jan 2017 05:01:26 +
From: "postmas...@inn.ru (JIRA)" 
To: 
Message-ID: 
In-Reply-To: 
References:  

Subject: [jira] [Commented] (KAFKA-1207) Launch Kafka from within Apache
 Mesos
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
X-inn-MailScanner-ESVA-Information: Please contact  for more information
X-inn-MailScanner-ESVA-ID: 7DB1C400C3.A86C2
X-inn-MailScanner-ESVA: Found to be clean
X-inn-MailScanner-ESVA-From: j...@apache.org
X-inn-MailScanner-ESVA-Watermark: 1485752492.53101@N/cprL+UfI45jvv+f1deSw
Return-Path: j...@apache.org
X-OrganizationHeadersPreserved: lc-exch-04.inn.local
X-CrossPremisesHeadersFilteredByDsnGenerator: lc-exch-04.inn.local



> Launch Kafka from within Apache Mesos
> -
>
> Key: KAFKA-1207
> URL: https://issues.apache.org/jira/browse/KAFKA-1207
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>  Labels: mesos
> Attachments: KAFKA-1207_2014-01-19_00:04:58.patch, 
> KAFKA-1207_2014-01-19_00:48:49.patch, KAFKA-1207.patch
>
>
> There are a few components to 

[jira] [Updated] (KAFKA-4684) Kafka does not offer kafka-configs.bat on Windows box

2017-01-23 Thread huxi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxi updated KAFKA-4684:

Labels: newbie  (was: )

> Kafka does not offer kafka-configs.bat on Windows box
> -
>
> Key: KAFKA-4684
> URL: https://issues.apache.org/jira/browse/KAFKA-4684
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: huxi
>Assignee: huxi
>Priority: Minor
>  Labels: newbie
>
> Kafka does not ship with kafka-configs.bat, so it's a little inconvenient to 
> add/modify configs on Windows platform



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-01-23 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834053#comment-15834053
 ] 

huxi edited comment on KAFKA-4669 at 1/23/17 8:25 AM:
--

Maybe we could invoke an alternative method `ProduceRequestResult#await(long, 
TimeUnit)` and specify a reasonable timeout value(i.e. 30 seconds or 
request.timeout.ms of time) in RecordAccumulator#awaitFlushCompletion. If await 
times out, we record a warn log to indicate users that some request did not get 
marked as complete, and flush will not be stuck anymore, as show below:

{code:borderStyle=solid}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (RecordBatch batch : this.incomplete.all()) {
if (!batch.produceFuture.await(30, TimeUnit.SECONDS)) {
log.warn("Did not complete the produce request 
for {} in 30 seconds.", batch.produceFuture.topicPartition());
}
}
} finally {
this.flushesInProgress.decrementAndGet();
}
}
{code}

[~becket_qin] Does it make sense?


was (Author: huxi_2b):
Maybe we could invoke an alternative method `ProduceRequestResult#await(long, 
TimeUnit)` and specify a reasonable timeout value in 
RecordAccumulator#awaitFlushCompletion. If await times out, we record a warn 
log to indicate users that some request did not get marked as complete, and 
flush will not be stuck anymore, as show below:

{code:borderStyle=solid}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (RecordBatch batch : this.incomplete.all()) {
if (!batch.produceFuture.await(30, TimeUnit.SECONDS)) {
log.warn("Did not complete the produce request 
for {} in 30 seconds.", batch.produceFuture.topicPartition());
}
}
} finally {
this.flushesInProgress.decrementAndGet();
}
}
{code}

[~becket_qin] Does it make sense?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2017-01-23 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834053#comment-15834053
 ] 

huxi commented on KAFKA-4669:
-

Maybe we could invoke an alternative method `ProduceRequestResult#await(long, 
TimeUnit)` and specify a reasonable timeout value in 
RecordAccumulator#awaitFlushCompletion. If await times out, we record a warn 
log to indicate users that some request did not get marked as complete, and 
flush will not be stuck anymore, as show below:

{code:borderStyle=solid}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (RecordBatch batch : this.incomplete.all()) {
if (!batch.produceFuture.await(30, TimeUnit.SECONDS)) {
log.warn("Did not complete the produce request 
for {} in 30 seconds.", batch.produceFuture.topicPartition());
}
}
} finally {
this.flushesInProgress.decrementAndGet();
}
}
{code}

[~becket_qin] Does it make sense?

> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws 
> exception
> -
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1
>Reporter: Cheng Ju
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives.  If an 
> exception is thrown after inFlightRequests.completeNext(source), then the 
> corresponding RecordBatch's done will never get called, and 
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case.  First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] 
> (org.apache.kafka.clients.producer.internals.Sender.run:130)  - Uncaught 
> error in kafka producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (703766) does 
> not match request (703764)
>   at 
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
>   at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
>   at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
>   at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
>   at java.lang.Thread.run(Thread.java:745)
> client code 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)